You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by be...@apache.org on 2023/01/12 15:04:27 UTC
[nifi] branch main updated: NIFI-10895 Update properties command for MiNiFi C2
This is an automated email from the ASF dual-hosted git repository.
bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8807a9d377 NIFI-10895 Update properties command for MiNiFi C2
8807a9d377 is described below
commit 8807a9d377b648eb30cd5823507ac81e58df4fa4
Author: Ferenc Erdei <er...@gmail.com>
AuthorDate: Wed Nov 30 10:12:35 2022 +0100
NIFI-10895 Update properties command for MiNiFi C2
Signed-off-by: Csaba Bejan <be...@gmail.com>
This closes #6733.
---
.../org/apache/nifi/c2/client/C2ClientConfig.java | 24 ++
.../apache/nifi/c2/client/http/C2HttpClient.java | 2 +
.../nifi/c2/client/http/C2HttpClientTest.java | 7 +-
.../nifi/c2/client/service/C2ClientService.java | 102 ++++++++-
.../service/operation/C2OperationHandler.java | 8 +
...ervice.java => C2OperationHandlerProvider.java} | 24 +-
.../client/service/operation/OperationQueue.java | 80 +++++++
.../service/operation/RequestedOperationDAO.java} | 39 ++--
.../UpdateConfigurationOperationHandler.java | 5 +
.../UpdatePropertiesOperationHandler.java | 92 ++++++++
.../c2/client/service/C2ClientServiceTest.java | 147 +++++++++++-
...st.java => C2OperationHandlerProviderTest.java} | 35 +--
.../UpdatePropertiesOperationHandlerTest.java | 130 +++++++++++
.../apache/nifi/c2/protocol/api/C2Operation.java | 29 +++
.../nifi/c2/protocol/api/C2OperationAck.java | 30 +++
.../nifi/c2/protocol/api/C2OperationState.java | 25 +++
.../apache/nifi/c2/protocol/api/OperandType.java | 1 +
.../apache/nifi/c2/protocol/api/OperationType.java | 3 +-
minifi/minifi-bootstrap/pom.xml | 5 +
.../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 36 ++-
.../bootstrap/command/CommandRunnerFactory.java | 10 +-
.../nifi/minifi/bootstrap/command/StartRunner.java | 145 ++++++++----
.../ConfigurationChangeCoordinator.java | 12 +-
.../differentiators/Differentiator.java | 4 +-
.../differentiators/WholeConfigDifferentiator.java | 10 +-
.../ingestors/FileChangeIngestor.java | 2 +-
.../ingestors/PullHttpChangeIngestor.java | 2 +-
.../ingestors/RestChangeIngestor.java | 2 +-
.../minifi/bootstrap/service/BootstrapCodec.java | 90 +++++---
.../bootstrap/service/BootstrapFileProvider.java | 19 +-
.../service/MiNiFiConfigurationChangeListener.java | 2 +-
.../minifi/bootstrap/service/MiNiFiListener.java | 15 +-
.../service/UpdateConfigurationService.java | 95 ++++++++
.../bootstrap/service/UpdatePropertiesService.java | 107 +++++++++
.../minifi/bootstrap/util/ConfigTransformer.java | 6 +
.../command/CommandRunnerFactoryTest.java | 35 +--
.../WholeConfigDifferentiatorTest.java | 10 +-
.../bootstrap/service/BootstrapCodecTest.java | 87 +++++--
minifi/minifi-commons/minifi-commons-api/pom.xml | 32 +++
.../minifi/commons/api/MiNiFiCommandState.java | 29 +--
.../nifi/minifi/commons/api/MiNiFiConstants.java | 27 +--
minifi/minifi-commons/pom.xml | 1 +
minifi/minifi-docker/pom.xml | 1 +
.../main/markdown/minifi-java-agent-quick-start.md | 11 +-
.../c2/hierarchical/minifi-edge1/expected.json | 2 +-
.../c2/hierarchical/minifi-edge2/expected.json | 2 +-
.../c2/hierarchical/minifi-edge3/expected.json | 2 +-
.../minifi-framework/minifi-framework-core/pom.xml | 5 +
.../org/apache/nifi/minifi/MiNiFiProperties.java | 151 +++++++++++++
.../apache/nifi/minifi/c2/C2NiFiProperties.java | 79 -------
.../apache/nifi/minifi/c2/C2NifiClientService.java | 250 +++++++++++++++++----
.../minifi/c2/FileBasedRequestedOperationDAO.java | 73 ++++++
.../c2/command/AgentPropertyValidationContext.java | 111 +++++++++
.../minifi/c2/command/PropertiesPersister.java | 143 ++++++++++++
.../nifi/minifi/c2/command/UpdatableProperty.java | 75 +++++++
.../command/UpdatePropertiesPropertyProvider.java | 78 +++++++
.../c2/FileBasedRequestedOperationDAOTest.java | 114 ++++++++++
.../minifi/c2/command/PropertiesPersisterTest.java | 152 +++++++++++++
.../UpdatePropertiesPropertyProviderTest.java | 87 +++++++
.../src/main/resources/conf/bootstrap.conf | 4 -
.../apache/nifi/minifi/StandardMiNiFiServer.java | 20 +-
61 files changed, 2501 insertions(+), 425 deletions(-)
diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index 2d823320c3..360e5287a2 100644
--- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -40,6 +40,8 @@ public class C2ClientConfig {
private final long callTimeout;
private final long readTimeout;
private final long connectTimeout;
+ private final int maxIdleConnections;
+ private final long keepAliveDuration;
private final String c2RequestCompression;
private final String c2AssetDirectory;
@@ -63,6 +65,8 @@ public class C2ClientConfig {
this.truststoreType = builder.truststoreType;
this.readTimeout = builder.readTimeout;
this.connectTimeout = builder.connectTimeout;
+ this.maxIdleConnections = builder.maxIdleConnections;
+ this.keepAliveDuration = builder.keepAliveDuration;
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
}
@@ -151,6 +155,14 @@ public class C2ClientConfig {
return c2AssetDirectory;
}
+ public int getMaxIdleConnections() {
+ return maxIdleConnections;
+ }
+
+ public long getKeepAliveDuration() {
+ return keepAliveDuration;
+ }
+
/**
* Builder for client configuration.
*/
@@ -175,6 +187,8 @@ public class C2ClientConfig {
private String truststoreType;
private long readTimeout;
private long connectTimeout;
+ private int maxIdleConnections;
+ private long keepAliveDuration;
private String c2RequestCompression;
private String c2AssetDirectory;
@@ -273,6 +287,16 @@ public class C2ClientConfig {
return this;
}
+ public Builder maxIdleConnections(final int maxIdleConnections) {
+ this.maxIdleConnections = maxIdleConnections;
+ return this;
+ }
+
+ public Builder keepAliveDuration(final long keepAliveDuration) {
+ this.keepAliveDuration = keepAliveDuration;
+ return this;
+ }
+
public Builder c2RequestCompression(final String c2RequestCompression) {
this.c2RequestCompression = c2RequestCompression;
return this;
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index 293851c0f5..d881fa5dff 100644
--- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
+import okhttp3.ConnectionPool;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
@@ -74,6 +75,7 @@ public class C2HttpClient implements C2Client {
// Set whether to follow redirects
okHttpClientBuilder.followRedirects(true);
+ okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS));
// Timeouts
okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);
diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
index aec67b2f0e..28ceef2d1f 100644
--- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
+++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
@@ -39,7 +39,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -51,6 +50,8 @@ public class C2HttpClientTest {
private static final String ACK_PATH = "c2/acknowledge";
private static final int HTTP_STATUS_OK = 200;
private static final int HTTP_STATUS_BAD_REQUEST = 400;
+ private static final long KEEP_ALIVE_DURATION = 5000l;
+ private static final int MAX_IDLE_CONNECTIONS = 5;
@Mock
private C2ClientConfig c2ClientConfig;
@@ -58,7 +59,6 @@ public class C2HttpClientTest {
@Mock
private C2Serializer serializer;
- @InjectMocks
private C2HttpClient c2HttpClient;
private MockWebServer mockWebServer;
@@ -69,6 +69,9 @@ public class C2HttpClientTest {
public void startServer() {
mockWebServer = new MockWebServer();
baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
+ when(c2ClientConfig.getKeepAliveDuration()).thenReturn(KEEP_ALIVE_DURATION);
+ when(c2ClientConfig.getMaxIdleConnections()).thenReturn(MAX_IDLE_CONNECTIONS);
+ c2HttpClient = new C2HttpClient(c2ClientConfig, serializer);
}
@AfterEach
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
index 67d406086e..e8f0718cbd 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
@@ -16,13 +16,24 @@
*/
package org.apache.nifi.c2.client.service;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+
+import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,23 +43,73 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider c2OperationHandlerProvider;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
- this.operationService = operationService;
+ this.c2OperationHandlerProvider = c2OperationHandlerProvider;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
+ if (heartbeatLocked) {
+ logger.debug("Heartbeats are locked, skipping sending for now");
+ } else {
+ try {
+ C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+ client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ } catch (Exception e) {
+ logger.error("Failed to send/process heartbeat:", e);
+ }
+ }
+ }
+
+ public void sendAcknowledge(C2OperationAck operationAck) {
try {
- C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
- client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ client.acknowledgeOperation(operationAck);
} catch (Exception e) {
- logger.error("Failed to send/process heartbeat:", e);
+ logger.error("Failed to send acknowledge:", e);
}
}
+ public void enableHeartbeat() {
+ heartbeatLocked = false;
+ }
+
+ private void disableHeartbeat() {
+ heartbeatLocked = true;
+ }
+
+ public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+ LinkedList<C2Operation> c2Operations = new LinkedList<>(requestedOperations);
+ C2Operation requestedOperation;
+ while ((requestedOperation = c2Operations.poll()) != null) {
+ Optional<C2OperationHandler> c2OperationHandler = c2OperationHandlerProvider.getHandlerForOperation(requestedOperation);
+ if (!c2OperationHandler.isPresent()) {
+ continue;
+ }
+ C2OperationHandler operationHandler = c2OperationHandler.get();
+ C2OperationAck c2OperationAck = operationHandler.handle(requestedOperation);
+ if (requiresRestart(operationHandler, c2OperationAck)) {
+ if (initiateRestart(c2Operations, requestedOperation)) {
+ return;
+ }
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(OperationState.NOT_APPLIED);
+ c2OperationAck.setOperationState(c2OperationState);
+ }
+ sendAcknowledge(c2OperationAck);
+ }
+ enableHeartbeat();
+ requestedOperationDAO.cleanup();
+ }
+
private void processResponse(C2HeartbeatResponse response) {
List<C2Operation> requestedOperations = response.getRequestedOperations();
if (requestedOperations != null && !requestedOperations.isEmpty()) {
@@ -59,11 +120,30 @@ public class C2ClientService {
}
}
- private void handleRequestedOperations(List<C2Operation> requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+ }
+
+ private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+ return Optional.ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(FULLY_APPLIED::equals)
+ .isPresent();
+ }
+
+ private boolean initiateRestart(LinkedList<C2Operation> requestedOperations, C2Operation requestedOperation) {
+ try {
+ disableHeartbeat();
+ requestedOperationDAO.save(new OperationQueue(requestedOperation, requestedOperations));
+ c2OperationRegister.accept(requestedOperation);
+ return true;
+ } catch (Exception e) {
+ logger.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
+ requestedOperationDAO.cleanup();
}
+ return false;
}
+
}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
index 586f0e624a..d8668acaa0 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
*/
Map<String, Object> getProperties();
+ /**
+ * Determines if the given operation requires to restart the MiNiFi process
+ * @return true if it requires restart, false otherwise
+ */
+ default boolean requiresRestart() {
+ return false;
+ }
+
/**
* Handler logic for the specific C2Operation
*
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
similarity index 72%
rename from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
rename to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
index 31e483929d..e56d55d865 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
@@ -22,32 +22,23 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class C2OperationService {
+public class C2OperationHandlerProvider {
- private static final Logger logger = LoggerFactory.getLogger(C2OperationService.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationHandlerProvider.class);
private final Map<OperationType, Map<OperandType, C2OperationHandler>> handlerMap = new HashMap<>();
- public C2OperationService(List<C2OperationHandler> handlers) {
+ public C2OperationHandlerProvider(List<C2OperationHandler> handlers) {
for (C2OperationHandler handler : handlers) {
handlerMap.computeIfAbsent(handler.getOperationType(), x -> new HashMap<>()).put(handler.getOperandType(), handler);
}
}
- public Optional<C2OperationAck> handleOperation(C2Operation operation) {
- return getHandlerForOperation(operation)
- .map(handler -> {
- logger.info("Handling {} {} operation", operation.getOperation(), operation.getOperand());
- return handler.handle(operation);
- });
- }
-
public Map<OperationType, Map<OperandType, C2OperationHandler>> getHandlers() {
Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = new HashMap<>();
handlerMap.entrySet()
@@ -61,8 +52,11 @@ public class C2OperationService {
return Collections.unmodifiableMap(handlers);
}
- private Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) {
- return Optional.ofNullable(handlerMap.get(operation.getOperation()))
- .map(operandMap -> operandMap.get(operation.getOperand()));
+ public Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) {
+ Optional<C2OperationHandler> handler = Optional.ofNullable(handlerMap.get(operation.getOperation())).map(operandMap -> operandMap.get(operation.getOperand()));
+ if (!handler.isPresent()) {
+ LOGGER.warn("No handler found for {} {} operation", operation.getOperation(), operation.getOperand());
+ }
+ return handler;
}
}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
new file mode 100644
index 0000000000..ae0c56c1a8
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private C2Operation currentOperation;
+ private List<C2Operation> remainingOperations;
+
+ public OperationQueue() {
+ }
+
+ public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) {
+ this.currentOperation = currentOperation;
+ this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations;
+ }
+
+ public C2Operation getCurrentOperation() {
+ return currentOperation;
+ }
+
+ public List<C2Operation> getRemainingOperations() {
+ return remainingOperations;
+ }
+
+ public void setCurrentOperation(C2Operation currentOperation) {
+ this.currentOperation = currentOperation;
+ }
+
+ public void setRemainingOperations(List<C2Operation> remainingOperations) {
+ this.remainingOperations = remainingOperations;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OperationQueue that = (OperationQueue) o;
+ return Objects.equals(currentOperation, that.currentOperation) && Objects.equals(remainingOperations, that.remainingOperations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(currentOperation, remainingOperations);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationQueue{" +
+ "currentOperation=" + currentOperation +
+ ", remainingOperations=" + remainingOperations +
+ '}';
+ }
+}
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
similarity index 57%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
index 1f1cb19035..1216aa812d 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
@@ -15,28 +15,31 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
-import java.util.Arrays;
import java.util.Optional;
-public enum OperandType {
+/**
+ * The purpose of this interface is to be able to persist operations between restarts.
+ */
+public interface RequestedOperationDAO {
+
+ /**
+ * Persist the given requested operation list
+ * @param operationQueue the queue containing the current and remaining operations
+ */
+ void save(OperationQueue operationQueue);
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- ASSET;
+ /**
+ * Returns the saved Operations
+ *
+ * @return the C2 Operations queue with the actual operation
+ */
+ Optional<OperationQueue> load();
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
+ /**
+ * Resets the saved operations
+ */
+ void cleanup();
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
index f04af88714..db3ef8a86d 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
@@ -136,4 +136,9 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
public Map<String, Object> getProperties() {
return operandPropertiesProvider.getProperties();
}
+
+ @Override
+ public boolean requiresRestart() {
+ return true;
+ }
}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java
new file mode 100644
index 0000000000..2a9ac64cc2
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class);
+
+ private final OperandPropertiesProvider operandPropertiesProvider;
+ private final Function<Map<String, String>, Boolean> persistProperties;
+
+ public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function<Map<String, String>, Boolean> persistProperties) {
+ this.operandPropertiesProvider = operandPropertiesProvider;
+ this.persistProperties = persistProperties;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return UPDATE;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return operandPropertiesProvider.getProperties();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(operation.getIdentifier());
+ C2OperationState operationState = new C2OperationState();
+ c2OperationAck.setOperationState(operationState);
+ try {
+ if (persistProperties.apply(operation.getArgs())) {
+ operationState.setState(FULLY_APPLIED);
+ } else {
+ LOGGER.info("Properties are already in desired state");
+ operationState.setState(NO_OPERATION);
+ }
+ } catch (IllegalArgumentException e) {
+ LOGGER.error(e.getMessage());
+ operationState.setState(NOT_APPLIED);
+ operationState.setDetails(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Exception happened during persisting properties", e);
+ operationState.setState(NOT_APPLIED);
+ operationState.setDetails("Failed to persist properties");
+ }
+ return c2OperationAck;
+ }
+
+ @Override
+ public boolean requiresRestart() {
+ return true;
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
index 37bd572dd1..6de1f86015 100644
--- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
@@ -17,22 +17,33 @@
package org.apache.nifi.c2.client.service;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@@ -49,11 +60,17 @@ public class C2ClientServiceTest {
private C2HeartbeatFactory c2HeartbeatFactory;
@Mock
- private C2OperationService operationService;
+ private C2OperationHandlerProvider operationService;
@Mock
private RuntimeInfoWrapper runtimeInfoWrapper;
+ @Mock
+ private RequestedOperationDAO requestedOperationDAO;
+
+ @Mock
+ private Consumer<C2Operation> c2OperationRegister;
+
@InjectMocks
private C2ClientService c2ClientService;
@@ -62,15 +79,18 @@ public class C2ClientServiceTest {
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
- hbResponse.setRequestedOperations(generateOperation(1));
+ final List<C2Operation> c2Operations = generateOperation(1);
+ hbResponse.setRequestedOperations(c2Operations);
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
- when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+ when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
+ when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new C2OperationAck());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService).handleOperation(any());
+ verify(c2OperationHandler).handle(any());
verify(client).acknowledgeOperation(any());
}
@@ -81,14 +101,16 @@ public class C2ClientServiceTest {
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
hbResponse.setRequestedOperations(generateOperation(operationNum));
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
- when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+ when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
+ when(c2OperationHandler.handle(any())).thenReturn(new C2OperationAck());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService, times(operationNum)).handleOperation(any());
+ verify(c2OperationHandler, times(operationNum)).handle(any());
verify(client, times(operationNum)).acknowledgeOperation(any());
}
@@ -102,7 +124,6 @@ public class C2ClientServiceTest {
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService, times(0)).handleOperation(any());
verify(client, times(0)).acknowledgeOperation(any());
}
@@ -117,7 +138,6 @@ public class C2ClientServiceTest {
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService, times(0)).handleOperation(any());
verify(client, times(0)).acknowledgeOperation(any());
}
@@ -128,16 +148,121 @@ public class C2ClientServiceTest {
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
hbResponse.setRequestedOperations(generateOperation(1));
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
- when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+ when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService).handleOperation(any());
verify(client, times(0)).acknowledgeOperation(any());
}
+ @Test
+ void shouldHeartbeatSendingNotPropagateExceptions() {
+ when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new RuntimeException());
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+ }
+
+ @Test
+ void shouldAckSendingNotPropagateExceptions() {
+ C2OperationAck operationAck = mock(C2OperationAck.class);
+ doThrow(new RuntimeException()).when(client).acknowledgeOperation(operationAck);
+
+ c2ClientService.sendAcknowledge(operationAck);
+ }
+
+ @Test
+ void shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState() {
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+ C2OperationAck operationAck = new C2OperationAck();
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(OperationState.NOT_APPLIED);
+ operationAck.setOperationState(c2OperationState);
+ when(c2OperationHandler.requiresRestart()).thenReturn(true);
+ when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+ when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck);
+
+ c2ClientService.handleRequestedOperations(generateOperation(1));
+
+ verify(operationService).getHandlerForOperation(any(C2Operation.class));
+ verify(c2OperationHandler).handle(any(C2Operation.class));
+ verify(requestedOperationDAO).cleanup();
+ verify(client).acknowledgeOperation(operationAck);
+ verifyNoMoreInteractions(operationService, client, requestedOperationDAO);
+ verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister);
+ }
+
+ @Test
+ void shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations() {
+ C2Operation c2Operation1 = new C2Operation();
+ c2Operation1.setIdentifier("1");
+ C2Operation c2Operation2 = new C2Operation();
+ c2Operation2.setIdentifier("2");
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+ when(c2OperationHandler.requiresRestart()).thenReturn(true);
+ when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(OperationState.FULLY_APPLIED);
+ c2OperationAck.setOperationState(c2OperationState);
+ when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
+
+ c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1, c2Operation2));
+
+ verify(requestedOperationDAO).save(new OperationQueue(c2Operation1, Collections.singletonList(c2Operation2)));
+ verify(c2OperationRegister).accept(c2Operation1);
+ verifyNoInteractions(client, c2HeartbeatFactory);
+ verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister, operationService);
+ }
+
+ @Test
+ void shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue() {
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+ C2Operation operation = new C2Operation();
+ when(c2OperationHandler.requiresRestart()).thenReturn(true);
+ when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(OperationState.FULLY_APPLIED);
+ c2OperationAck.setOperationState(c2OperationState);
+ when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
+ doThrow(new RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class));
+ c2ClientService.handleRequestedOperations(Collections.singletonList(operation));
+ when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new C2Heartbeat());
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(runtimeInfoWrapper);
+ verify(client).publishHeartbeat(any(C2Heartbeat.class));
+ }
+
+ @Test
+ void shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue() {
+ C2OperationHandler c2OperationHandlerForRestart = mock(C2OperationHandler.class);
+ C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+ C2Operation operation1 = new C2Operation();
+ operation1.setIdentifier("1");
+ C2Operation operation2 = new C2Operation();
+ operation2.setIdentifier("2");
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(OperationState.FULLY_APPLIED);
+ c2OperationAck.setOperationState(c2OperationState);
+ when(c2OperationHandler.requiresRestart()).thenReturn(false);
+ when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true);
+ when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart));
+ when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler));
+ when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck);
+ when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck);
+
+ doThrow(new RuntimeException()).when(c2OperationRegister).accept(operation1);
+
+ c2ClientService.handleRequestedOperations(Arrays.asList(operation1, operation2));
+
+ verify(client, times(2)).acknowledgeOperation(c2OperationAck);
+ }
+
private List<C2Operation> generateOperation(int num) {
return IntStream.range(0, num)
.mapToObj(x -> new C2Operation())
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
similarity index 73%
rename from c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
rename to c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
index d821dd0738..bc6649ec25 100644
--- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
@@ -34,7 +34,7 @@ import org.apache.nifi.c2.protocol.api.OperationType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-public class C2OperationServiceTest {
+public class C2OperationHandlerProviderTest {
private static C2OperationAck operationAck;
@@ -45,45 +45,46 @@ public class C2OperationServiceTest {
}
@Test
- void testHandleOperationReturnsEmptyForUnrecognisedOperationType() {
- C2OperationService service = new C2OperationService(Collections.emptyList());
+ void testGetHandlerForReturnsEmptyForUnrecognisedOperationType() {
+ C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.emptyList());
C2Operation operation = new C2Operation();
- operation.setOperation(OperationType.UPDATE);
- operation.setOperand(CONFIGURATION);
- Optional<C2OperationAck> ack = service.handleOperation(operation);
+ operation.setOperation(DESCRIBE);
+ operation.setOperand(MANIFEST);
+ Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
- assertFalse(ack.isPresent());
+ assertFalse(handler.isPresent());
}
@Test
- void testHandleOperation() {
- C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
+ void testGetHandlerForOperationReturnsEmptyForOperandMismatch() {
+ C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.singletonList(new TestInvalidOperationHandler()));
C2Operation operation = new C2Operation();
operation.setOperation(DESCRIBE);
operation.setOperand(MANIFEST);
- Optional<C2OperationAck> ack = service.handleOperation(operation);
+ Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
- assertTrue(ack.isPresent());
- assertEquals(operationAck, ack.get());
+ assertFalse(handler.isPresent());
}
@Test
- void testHandleOperationReturnsEmptyForOperandMismatch() {
- C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
+ void testHandleOperation() {
+ TestDescribeOperationHandler describeOperationHandler = new TestDescribeOperationHandler();
+ C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.singletonList(describeOperationHandler));
C2Operation operation = new C2Operation();
operation.setOperation(DESCRIBE);
operation.setOperand(MANIFEST);
- Optional<C2OperationAck> ack = service.handleOperation(operation);
+ Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
- assertFalse(ack.isPresent());
+ assertTrue(handler.isPresent());
+ assertEquals(describeOperationHandler, handler.get());
}
@Test
void testHandlersAreReturned() {
- C2OperationService service = new C2OperationService(Arrays.asList(new TestDescribeOperationHandler(), new TestInvalidOperationHandler()));
+ C2OperationHandlerProvider service = new C2OperationHandlerProvider(Arrays.asList(new TestDescribeOperationHandler(), new TestInvalidOperationHandler()));
Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = service.getHandlers();
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java
new file mode 100644
index 0000000000..b060143dc4
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+ private static final String ID = "id";
+ private static final Map<String, String> ARGS = Collections.singletonMap("key", "value");
+
+ @Mock
+ private OperandPropertiesProvider operandPropertiesProvider;
+
+ @Mock
+ private Function<Map<String, String>, Boolean> persistProperties;
+
+ @InjectMocks
+ private UpdatePropertiesOperationHandler updatePropertiesOperationHandler;
+
+ @Test
+ void shouldReturnStaticSettings() {
+ assertEquals(UPDATE, updatePropertiesOperationHandler.getOperationType());
+ assertEquals(PROPERTIES, updatePropertiesOperationHandler.getOperandType());
+ assertTrue(updatePropertiesOperationHandler.requiresRestart());
+ }
+
+ @Test
+ void shouldReturnProperties() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("test", new Object());
+ when(operandPropertiesProvider.getProperties()).thenReturn(properties);
+
+ Map<String, Object> result = updatePropertiesOperationHandler.getProperties();
+
+ assertEquals(properties, result);
+ }
+
+ @Test
+ void shouldReturnAckWithFullyAppliedWhenPersistIsSuccessful() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenReturn(true);
+
+ C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.FULLY_APPLIED), result);
+ }
+
+ @Test
+ void shouldReturnAckWithNoOperationWhenPersistReturnFalse() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenReturn(false);
+
+ C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.NO_OPERATION), result);
+ }
+
+ @Test
+ void shouldReturnNotAppliedInCaseOfIllegalArgumentException() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenThrow(new IllegalArgumentException());
+
+ C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.NOT_APPLIED), result);
+ }
+
+ @Test
+ void shouldReturnNotAppliedInCaseOfException() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenThrow(new RuntimeException());
+
+ C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+ C2OperationAck expected = getExpected(OperationState.NOT_APPLIED);
+ expected.getOperationState().setDetails("Failed to persist properties");
+ assertEquals(expected, result);
+ }
+
+ private C2OperationAck getExpected(OperationState operationState) {
+ final C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(ID);
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(operationState);
+ c2OperationAck.setOperationState(c2OperationState);
+ return c2OperationAck;
+ }
+
+ private C2Operation getC2Operation() {
+ C2Operation c2Operation = new C2Operation();
+ c2Operation.setArgs(ARGS);
+ c2Operation.setIdentifier(ID);
+ return c2Operation;
+ }
+}
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
index 8828d77fbc..5bd7de1d47 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
@ApiModel
@@ -104,4 +105,32 @@ public class C2Operation implements Serializable {
this.dependencies = dependencies;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ C2Operation operation1 = (C2Operation) o;
+ return Objects.equals(identifier, operation1.identifier) && operation == operation1.operation && operand == operation1.operand && Objects.equals(args,
+ operation1.args) && Objects.equals(dependencies, operation1.dependencies);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, operation, operand, args, dependencies);
+ }
+
+ @Override
+ public String toString() {
+ return "C2Operation{" +
+ "identifier='" + identifier + '\'' +
+ ", operation=" + operation +
+ ", operand=" + operand +
+ ", args=" + args +
+ ", dependencies=" + dependencies +
+ '}';
+ }
}
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
index a12d62e5b7..d74967e34f 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
+import java.util.Objects;
@ApiModel
public class C2OperationAck implements Serializable {
@@ -81,4 +82,33 @@ public class C2OperationAck implements Serializable {
public void setFlowInfo(final FlowInfo flowInfo) {
this.flowInfo = flowInfo;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ C2OperationAck that = (C2OperationAck) o;
+ return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState) && Objects.equals(deviceInfo,
+ that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo);
+ }
+
+ @Override
+ public String toString() {
+ return "C2OperationAck{" +
+ "operationId='" + operationId + '\'' +
+ ", operationState=" + operationState +
+ ", deviceInfo=" + deviceInfo +
+ ", agentInfo=" + agentInfo +
+ ", flowInfo=" + flowInfo +
+ '}';
+ }
}
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
index 21a44b582c..dc4e303e43 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
+import java.util.Objects;
/**
* Simple model of operations. The current approach is to capture a shared state ( via agent(s)
@@ -69,6 +70,30 @@ public class C2OperationState implements Serializable {
this.state = OperationState.fromOrdinal(state);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ C2OperationState that = (C2OperationState) o;
+ return state == that.state && Objects.equals(details, that.details);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(state, details);
+ }
+
+ @Override
+ public String toString() {
+ return "C2OperationState{" +
+ "state=" + state +
+ ", details='" + details + '\'' +
+ '}';
+ }
public enum OperationState {
FULLY_APPLIED,
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index 1f1cb19035..bc9538d9c3 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -27,6 +27,7 @@ public enum OperandType {
DEBUG,
MANIFEST,
REPOSITORY,
+ PROPERTIES,
ASSET;
public static Optional<OperandType> fromString(String value) {
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index 9ee3630aef..766bc6d303 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -22,6 +22,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
import java.util.Arrays;
import java.util.Set;
@@ -36,7 +37,7 @@ public enum OperationType {
// C2 Server -> C2 Client Commands
CLEAR(CONNECTION),
DESCRIBE(MANIFEST),
- UPDATE(CONFIGURATION, ASSET),
+ UPDATE(CONFIGURATION, ASSET, PROPERTIES),
RESTART,
START,
STOP,
diff --git a/minifi/minifi-bootstrap/pom.xml b/minifi/minifi-bootstrap/pom.xml
index f77d58d9cc..5ad82482df 100644
--- a/minifi/minifi-bootstrap/pom.xml
+++ b/minifi/minifi-bootstrap/pom.xml
@@ -87,6 +87,11 @@ limitations under the License.
<artifactId>minifi-commons-schema</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-commons-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 6958bb2498..1c141547a0 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -44,6 +44,7 @@ import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.bootstrap.service.ReloadService;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,7 @@ public class RunMiNiFi implements ConfigurationFileHolder {
public static final int UNINITIALIZED = -1;
private static final String STATUS_FILE_PORT_KEY = "port";
private static final String STATUS_FILE_SECRET_KEY = "secret.key";
+ private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
private final BootstrapFileProvider bootstrapFileProvider;
private final ConfigurationChangeCoordinator configurationChangeCoordinator;
@@ -87,10 +89,14 @@ public class RunMiNiFi implements ConfigurationFileHolder {
private final Lock startedLock = new ReentrantLock();
// Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
private final AtomicBoolean reloading = new AtomicBoolean(false);
+ private final AtomicBoolean commandInProgress = new AtomicBoolean(false);
+ private final MiNiFiCommandSender miNiFiCommandSender;
+ private final CurrentPortProvider currentPortProvider;
+ private final ObjectMapper objectMapper;
public RunMiNiFi(File bootstrapConfigFile) throws IOException {
bootstrapFileProvider = new BootstrapFileProvider(bootstrapConfigFile);
-
+ objectMapper = getObjectMapper();
Properties properties = bootstrapFileProvider.getStatusProperties();
miNiFiParameters = new MiNiFiParameters(
@@ -100,20 +106,19 @@ public class RunMiNiFi implements ConfigurationFileHolder {
);
ProcessUtils processUtils = new UnixProcessUtils();
- MiNiFiCommandSender miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, getObjectMapper());
+ miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, objectMapper);
MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender, processUtils);
periodicStatusReporterManager =
new PeriodicStatusReporterManager(bootstrapFileProvider.getBootstrapProperties(), miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
- configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider.getBootstrapProperties(), this,
- singleton(new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider)));
-
+ MiNiFiConfigurationChangeListener configurationChangeListener = new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider);
+ configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider, this, singleton(configurationChangeListener));
- CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
+ currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
GracefulShutdownParameterProvider gracefulShutdownParameterProvider = new GracefulShutdownParameterProvider(bootstrapFileProvider);
reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this, processUtils);
commandRunnerFactory = new CommandRunnerFactory(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager,
bootstrapFileProvider, new MiNiFiStdLogHandler(), bootstrapConfigFile, this, gracefulShutdownParameterProvider,
- new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils);
+ new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils, configurationChangeListener);
}
public int run(BootstrapCommand command, String... args) {
@@ -212,6 +217,18 @@ public class RunMiNiFi implements ConfigurationFileHolder {
configurationChangeCoordinator.close();
}
+ public void sendAcknowledgeToMiNiFi(MiNiFiCommandState commandState) {
+ try {
+ if (commandInProgress.getAndSet(false)) {
+ Integer currentPort = currentPortProvider.getCurrentPort();
+ CMD_LOGGER.info("Sending acknowledge with state {} to MiNiFi on port {}", commandState, currentPort);
+ miNiFiCommandSender.sendCommand(ACKNOWLEDGE_OPERATION, currentPort, commandState.name());
+ }
+ } catch (Exception e) {
+ CMD_LOGGER.error("Failed to send Acknowledge to MiNiFi", e);
+ }
+ }
+
public PeriodicStatusReporterManager getPeriodicStatusReporterManager() {
return periodicStatusReporterManager;
}
@@ -245,7 +262,10 @@ public class RunMiNiFi implements ConfigurationFileHolder {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- objectMapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
return objectMapper;
}
+
+ public void setCommandInProgress(boolean value) {
+ commandInProgress.set(value);
+ }
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
index 33902e36f8..730995f76f 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.nifi.minifi.bootstrap.BootstrapCommand;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
@@ -47,11 +48,13 @@ public class CommandRunnerFactory {
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
private final ProcessUtils processUtils;
+ private final ConfigurationChangeListener configurationChangeListener;
public CommandRunnerFactory(MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider, MiNiFiParameters miNiFiParameters,
MiNiFiStatusProvider miNiFiStatusProvider, PeriodicStatusReporterManager periodicStatusReporterManager,
BootstrapFileProvider bootstrapFileProvider, MiNiFiStdLogHandler miNiFiStdLogHandler, File bootstrapConfigFile, RunMiNiFi runMiNiFi,
- GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
+ GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils,
+ ConfigurationChangeListener configurationChangeListener) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.miNiFiParameters = miNiFiParameters;
@@ -64,6 +67,7 @@ public class CommandRunnerFactory {
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
this.processUtils = processUtils;
+ this.configurationChangeListener = configurationChangeListener;
}
/**
@@ -77,7 +81,7 @@ public class CommandRunnerFactory {
case START:
case RUN:
commandRunner = new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
- bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils);
+ bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, configurationChangeListener);
break;
case STOP:
commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils);
@@ -107,7 +111,7 @@ public class CommandRunnerFactory {
List<CommandRunner> compositeList = new LinkedList<>();
compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils));
compositeList.add(new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
- bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils));
+ bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, configurationChangeListener));
return compositeList;
}
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
index 9b29360d54..a150fa0c48 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
@@ -18,13 +18,17 @@
package org.apache.nifi.minifi.bootstrap.command;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.FULLY_APPLIED;
+import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.NOT_APPLIED_WITH_RESTART;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CMD_LOGGER;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.STATUS_FILE_PID_KEY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
import java.io.File;
@@ -50,6 +54,7 @@ import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.ShutdownHook;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.exception.StartupFailureException;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
@@ -57,8 +62,6 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiListener;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
-import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
-import org.apache.nifi.util.Tuple;
public class StartRunner implements CommandRunner {
private static final int STARTUP_WAIT_SECONDS = 60;
@@ -76,11 +79,13 @@ public class StartRunner implements CommandRunner {
private final RunMiNiFi runMiNiFi;
private volatile ShutdownHook shutdownHook;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
- private final ProcessUtils processUtils;
+ private final ConfigurationChangeListener configurationChangeListener;
+
+ private int listenPort;
public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
- RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
+ RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ConfigurationChangeListener configurationChangeListener) {
this.currentPortProvider = currentPortProvider;
this.bootstrapFileProvider = bootstrapFileProvider;
this.periodicStatusReporterManager = periodicStatusReporterManager;
@@ -89,7 +94,7 @@ public class StartRunner implements CommandRunner {
this.bootstrapConfigFile = bootstrapConfigFile;
this.runMiNiFi = runMiNiFi;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
- this.processUtils = processUtils;
+ this.configurationChangeListener = configurationChangeListener;
}
/**
@@ -129,9 +134,7 @@ public class StartRunner implements CommandRunner {
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
initConfigFiles(bootstrapProperties, confDir);
- Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
- ProcessBuilder builder = tuple.getKey();
- Process process = tuple.getValue();
+ Process process = startMiNiFi();
try {
while (true) {
@@ -152,8 +155,7 @@ public class StartRunner implements CommandRunner {
Thread.sleep(5000L);
continue;
}
-
- process = restartNifi(bootstrapProperties, confDir, builder);
+ process = restartMiNifi(bootstrapProperties, confDir);
// failed to start process
if (process == null) {
return;
@@ -170,44 +172,39 @@ public class StartRunner implements CommandRunner {
}
}
- private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder) throws IOException {
+ private Process restartMiNifi(Properties bootstrapProperties, String confDir) throws IOException {
Process process;
boolean previouslyStarted = runMiNiFi.isNiFiStarted();
+ boolean configChangeSuccessful = true;
if (!previouslyStarted) {
- File swapConfigFile = bootstrapFileProvider.getSwapFile();
+ File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
+ File bootstrapSwapConfigFile = bootstrapFileProvider.getBootstrapConfSwapFile();
if (swapConfigFile.exists()) {
- DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
-
- try {
- ByteBuffer tempConfigFile = generateConfigFiles(new FileInputStream(swapConfigFile), confDir, bootstrapProperties);
- runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
- } catch (ConfigurationChangeException e) {
- DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+ if (!revertFlowConfig(bootstrapProperties, confDir, swapConfigFile)) {
return null;
}
-
- Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
-
- DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file");
- if (!swapConfigFile.delete()) {
- DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+ } else if(bootstrapSwapConfigFile.exists()) {
+ if (!revertBootstrapConfig(confDir, bootstrapSwapConfigFile)) {
+ return null;
}
- runMiNiFi.setReloading(false);
} else {
DEFAULT_LOGGER.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi");
return null;
}
+ configChangeSuccessful = false;
} else {
runMiNiFi.setNiFiStarted(false);
}
miNiFiParameters.setSecretKey(null);
- process = startMiNiFiProcess(builder);
+ CMD_LOGGER.info("Restarting Apache MiNiFi...");
+ process = startMiNiFiProcess(getProcessBuilder());
boolean started = waitForStart();
if (started) {
+ runMiNiFi.sendAcknowledgeToMiNiFi(configChangeSuccessful ? FULLY_APPLIED : NOT_APPLIED_WITH_RESTART);
Long pid = OSUtils.getProcessId(process, DEFAULT_LOGGER);
DEFAULT_LOGGER.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
} else {
@@ -216,6 +213,46 @@ public class StartRunner implements CommandRunner {
return process;
}
+ private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+ DEFAULT_LOGGER.info("Flow Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+ try {
+ ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties);
+ runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+ } catch (ConfigurationChangeException e) {
+ DEFAULT_LOGGER.error("The flow swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+ return false;
+ }
+
+ Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+ DEFAULT_LOGGER.info("Replacing flow config file with swap file and deleting swap file");
+ if (!swapConfigFile.delete()) {
+ DEFAULT_LOGGER.warn("The flow swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+ }
+ runMiNiFi.setReloading(false);
+ return true;
+ }
+
+ private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException {
+ DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+ Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+ try {
+ ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties());
+ runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+ } catch (ConfigurationChangeException e) {
+ DEFAULT_LOGGER.error("The bootstrap swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+ return false;
+ }
+
+ if (!bootstrapSwapConfigFile.delete()) {
+ DEFAULT_LOGGER.warn("The bootstrap swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+ }
+ runMiNiFi.setReloading(false);
+ return true;
+ }
+
private boolean needRestart() throws IOException {
boolean needRestart = true;
File statusFile = bootstrapFileProvider.getStatusFile();
@@ -236,16 +273,8 @@ public class StartRunner implements CommandRunner {
try {
Thread.sleep(1000L);
if (runMiNiFi.getReloading() && runMiNiFi.isNiFiStarted()) {
- File swapConfigFile = bootstrapFileProvider.getSwapFile();
- if (swapConfigFile.exists()) {
- DEFAULT_LOGGER.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration.");
-
- if (swapConfigFile.delete()) {
- DEFAULT_LOGGER.info("Swap file was successfully deleted.");
- } else {
- DEFAULT_LOGGER.error("Swap file was not deleted. It should be deleted manually.");
- }
- }
+ deleteSwapFile(bootstrapFileProvider.getConfigYmlSwapFile());
+ deleteSwapFile(bootstrapFileProvider.getBootstrapConfSwapFile());
runMiNiFi.setReloading(false);
}
} catch (InterruptedException ie) {
@@ -253,6 +282,18 @@ public class StartRunner implements CommandRunner {
}
}
+ private void deleteSwapFile(File file) {
+ if (file.exists()) {
+ DEFAULT_LOGGER.info("MiNiFi has finished reloading successfully and {} file exists. Deleting old configuration.", file.getName());
+
+ if (file.delete()) {
+ DEFAULT_LOGGER.info("Swap file ({}) was successfully deleted.", file.getName());
+ } else {
+ DEFAULT_LOGGER.error("Swap file ({}) was not deleted. It should be deleted manually.", file.getAbsoluteFile());
+ }
+ }
+ }
+
private void initConfigFiles(Properties bootstrapProperties, String confDir) throws IOException {
File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
try (InputStream inputStream = new FileInputStream(configFile)) {
@@ -269,27 +310,34 @@ public class StartRunner implements CommandRunner {
}
}
- private Tuple<ProcessBuilder, Process> startMiNiFi() throws IOException {
- ProcessBuilder builder = new ProcessBuilder();
+ private Process startMiNiFi() throws IOException {
- File workingDir = getWorkingDir();
MiNiFiListener listener = new MiNiFiListener();
- int listenPort = listener.start(runMiNiFi);
+ listenPort = listener.start(runMiNiFi, bootstrapFileProvider, configurationChangeListener);
+
+ CMD_LOGGER.info("Starting Apache MiNiFi...");
+
+ return startMiNiFiProcess(getProcessBuilder());
+ }
+
+ private ProcessBuilder getProcessBuilder() throws IOException{
+ ProcessBuilder builder = new ProcessBuilder();
+ File workingDir = getWorkingDir();
+
List<String> cmd = miNiFiExecCommandProvider.getMiNiFiExecCommand(listenPort, workingDir);
builder.command(cmd);
builder.directory(workingDir);
-
- CMD_LOGGER.info("Starting Apache MiNiFi...");
- CMD_LOGGER.info("Working Directory: {}", workingDir.getAbsolutePath());
+ CMD_LOGGER.debug("Working Directory: {}", workingDir.getAbsolutePath());
CMD_LOGGER.info("Command: {}", String.join(" ", cmd));
-
- return new Tuple<>(builder, startMiNiFiProcess(builder));
+ return builder;
}
private Process startMiNiFiProcess(ProcessBuilder builder) throws IOException {
Process process = builder.start();
miNiFiStdLogHandler.initLogging(process);
+ miNiFiParameters.setMiNiFiPort(UNINITIALIZED);
+ miNiFiParameters.setMinifiPid(UNINITIALIZED);
Long pid = OSUtils.getProcessId(process, CMD_LOGGER);
if (pid != null) {
miNiFiParameters.setMinifiPid(pid);
@@ -317,8 +365,9 @@ public class StartRunner implements CommandRunner {
lock.lock();
try {
long startTime = System.nanoTime();
-
- while (miNiFiParameters.getMinifiPid() < 1 && miNiFiParameters.getMiNiFiPort() < 1) {
+ while (miNiFiParameters.getMinifiPid() < 1 && miNiFiParameters.getMiNiFiPort() < 1 || !runMiNiFi.isNiFiStarted()) {
+ DEFAULT_LOGGER.debug("Waiting MiNiFi to start Pid={}, port={}, isNifiStarted={}",
+ miNiFiParameters.getMinifiPid(), miNiFiParameters.getMiNiFiPort(), runMiNiFi.isNiFiStarted());
try {
startupCondition.await(1, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
index 8bf255f07c..7f3a0a8dce 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
@@ -28,6 +28,7 @@ import java.util.Properties;
import java.util.Set;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +42,12 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
private final Set<ConfigurationChangeListener> configurationChangeListeners;
private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
- private final Properties bootstrapProperties;
+ private final BootstrapFileProvider bootstrapFileProvider;
private final RunMiNiFi runMiNiFi;
- public ConfigurationChangeCoordinator(Properties bootstrapProperties, RunMiNiFi runMiNiFi,
+ public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
- this.bootstrapProperties = bootstrapProperties;
+ this.bootstrapFileProvider = bootstrapFileProvider;
this.runMiNiFi = runMiNiFi;
this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
}
@@ -54,7 +55,7 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
/**
* Begins the associated notification service provided by the given implementation. In most implementations, no action will occur until this method is invoked.
*/
- public void start() {
+ public void start() throws IOException{
initialize();
changeIngestors.forEach(ChangeIngestor::start);
}
@@ -102,8 +103,9 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
}
}
- private void initialize() {
+ private void initialize() throws IOException {
close();
+ Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
// cleanup previously initialized ingestors
String ingestorsCsv = bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY);
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
index fa65aa0378..b2cd1bd77a 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
@@ -18,7 +18,6 @@
package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
import java.io.IOException;
-import java.util.Properties;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
/**
@@ -31,10 +30,9 @@ public interface Differentiator <T> {
/**
* Initialise the differentiator with the initial configuration
*
- * @param properties the properties to be used
* @param configurationFileHolder holder for the config file
*/
- void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+ void initialize(ConfigurationFileHolder configurationFileHolder);
/**
* Determine whether the config file changed
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
index e86fe632ef..5a081d33b6 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
@@ -17,17 +17,15 @@
package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class WholeConfigDifferentiator {
@@ -59,7 +57,7 @@ public abstract class WholeConfigDifferentiator {
}
}
- public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) {
+ public void initialize(ConfigurationFileHolder configurationFileHolder) {
this.configurationFileHolder = configurationFileHolder;
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
index d7ae75a877..97260aa026 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -173,7 +173,7 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
} else {
differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
}
protected void setConfigFilePath(Path configFilePath) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
index 14000d9e6b..f321f19b1f 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -196,7 +196,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
} else {
differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
index cb9ded9762..fe0ccbf548 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
@@ -111,7 +111,7 @@ public class RestChangeIngestor implements ChangeIngestor {
} else {
differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
// create the secure connector if keystore location is specified
if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
index c64bc547cf..9d95f941f0 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.exception.InvalidCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,24 +38,27 @@ public class BootstrapCodec {
private static final String FALSE = Boolean.FALSE.toString();
private final RunMiNiFi runner;
- private final BufferedReader reader;
- private final BufferedWriter writer;
private final Logger logger = LoggerFactory.getLogger(BootstrapCodec.class);
+ private final UpdatePropertiesService updatePropertiesService;
+ private final UpdateConfigurationService updateConfigurationService;
- public BootstrapCodec(RunMiNiFi runner, InputStream in, OutputStream out) {
+ public BootstrapCodec(RunMiNiFi runner, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) {
this.runner = runner;
- this.reader = new BufferedReader(new InputStreamReader(in));
- this.writer = new BufferedWriter(new OutputStreamWriter(out));
+ this.updatePropertiesService = new UpdatePropertiesService(runner, logger, bootstrapFileProvider);
+ this.updateConfigurationService = new UpdateConfigurationService(runner, configurationChangeListener, bootstrapFileProvider);
}
- public void communicate() throws IOException {
+ public void communicate(InputStream in, OutputStream out) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+
String line = reader.readLine();
String[] splits = Optional.ofNullable(line).map(l -> l.split(" ")).orElse(new String[0]);
if (splits.length == 0) {
throw new IOException("Received invalid command from MiNiFi: " + line);
}
- String cmd = splits[0];
+ BootstrapCommand cmd = BootstrapCommand.safeValueOf(splits[0]);
String[] args;
if (splits.length == 1) {
args = new String[0];
@@ -63,45 +67,65 @@ public class BootstrapCodec {
}
try {
- processRequest(cmd, args);
+ processRequest(cmd, args, writer);
} catch (InvalidCommandException exception) {
throw new IOException("Received invalid command from MiNiFi: " + line, exception);
}
}
- private void processRequest(String cmd, String[] args) throws InvalidCommandException, IOException {
+ private void processRequest(BootstrapCommand cmd, String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
switch (cmd) {
- case "PORT":
- handlePortCommand(args);
+ case PORT:
+ handlePortCommand(args, writer);
+ break;
+ case STARTED:
+ handleStartedCommand(args, writer);
+ break;
+ case SHUTDOWN:
+ handleShutDownCommand(writer);
break;
- case "STARTED":
- handleStartedCommand(args);
+ case RELOAD:
+ handleReloadCommand(writer);
break;
- case "SHUTDOWN":
- handleShutDownCommand();
+ case UPDATE_PROPERTIES:
+ handlePropertiesUpdateCommand(writer);
break;
- case "RELOAD":
- handleReloadCommand();
+ case UPDATE_CONFIGURATION:
+ handleUpdateConfigurationCommand(writer);
break;
default:
throw new InvalidCommandException("Unknown command: " + cmd);
}
}
- private void handleReloadCommand() throws IOException {
+ private void handleUpdateConfigurationCommand(BufferedWriter writer) throws IOException {
+ logger.debug("Received 'UPDATE_CONFIGURATION' command from MINIFI");
+ writeOk(writer);
+ runner.setCommandInProgress(true);
+ updateConfigurationService.handleUpdate().ifPresent(runner::sendAcknowledgeToMiNiFi);
+ }
+
+ private void handlePropertiesUpdateCommand(BufferedWriter writer) throws IOException {
+ logger.debug("Received 'UPDATE_PROPERTIES' command from MINIFI");
+ writeOk(writer);
+ runner.setCommandInProgress(true);
+ updatePropertiesService.handleUpdate().ifPresent(runner::sendAcknowledgeToMiNiFi);
+ }
+
+ private void handleReloadCommand(BufferedWriter writer) throws IOException {
logger.debug("Received 'RELOAD' command from MINIFI");
- writeOk();
+ writeOk(writer);
}
- private void handleShutDownCommand() throws IOException {
+ private void handleShutDownCommand(BufferedWriter writer) throws IOException {
logger.debug("Received 'SHUTDOWN' command from MINIFI");
+ writeOk(writer);
runner.shutdownChangeNotifier();
runner.getPeriodicStatusReporterManager().shutdownPeriodicStatusReporters();
- writeOk();
}
- private void handleStartedCommand(String[] args) throws InvalidCommandException, IOException {
- logger.debug("Received 'STARTED' command from MINIFI");
+ private void handleStartedCommand(String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
+ logger.info("Received 'STARTED' command from MINIFI");
if (args.length != 1) {
throw new InvalidCommandException("STARTED command must contain a status argument");
}
@@ -110,15 +134,15 @@ public class BootstrapCodec {
throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'");
}
+ writeOk(writer);
runner.getPeriodicStatusReporterManager().shutdownPeriodicStatusReporters();
runner.getPeriodicStatusReporterManager().startPeriodicNotifiers();
runner.getConfigurationChangeCoordinator().start();
runner.setNiFiStarted(Boolean.parseBoolean(args[0]));
- writeOk();
}
- private void handlePortCommand(String[] args) throws InvalidCommandException, IOException {
+ private void handlePortCommand(String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
logger.debug("Received 'PORT' command from MINIFI");
if (args.length != 2) {
throw new InvalidCommandException("PORT command must contain the port and secretKey arguments");
@@ -135,13 +159,25 @@ public class BootstrapCodec {
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
}
+ writeOk(writer);
runner.setMiNiFiParameters(port, args[1]);
- writeOk();
}
- private void writeOk() throws IOException {
+ private void writeOk(BufferedWriter writer) throws IOException {
writer.write("OK");
writer.newLine();
writer.flush();
}
+
+ private enum BootstrapCommand {
+ PORT, STARTED, SHUTDOWN, RELOAD, UPDATE_PROPERTIES, UPDATE_CONFIGURATION, UNKNOWN;
+
+ public static BootstrapCommand safeValueOf(String value) {
+ try {
+ return BootstrapCommand.valueOf(value);
+ } catch (IllegalArgumentException e) {
+ return UNKNOWN;
+ }
+ }
+ }
}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
index 67f57b4948..ee94602aea 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.minifi.bootstrap.service;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.STATUS_FILE_PID_KEY;
import static org.apache.nifi.minifi.bootstrap.SensitiveProperty.SENSITIVE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
import java.io.File;
import java.io.FileInputStream;
@@ -93,7 +94,7 @@ public class BootstrapFileProvider {
return reloadFile;
}
- public File getSwapFile() {
+ public File getConfigYmlSwapFile() {
File confDir = bootstrapConfigFile.getParentFile();
File swapFile = new File(confDir, "swap.yml");
@@ -101,6 +102,22 @@ public class BootstrapFileProvider {
return swapFile;
}
+ public File getBootstrapConfSwapFile() {
+ File confDir = bootstrapConfigFile.getParentFile();
+ File swapFile = new File(confDir, "bootstrap-swap.conf");
+
+ LOGGER.debug("Bootstrap Swap File: {}", swapFile);
+ return swapFile;
+ }
+
+ public File getBootstrapConfNewFile() {
+ File confDir = bootstrapConfigFile.getParentFile();
+ File newFile = new File(confDir, BOOTSTRAP_UPDATED_FILE_NAME);
+
+ LOGGER.debug("Bootstrap new File: {}", newFile);
+ return newFile;
+ }
+
public Properties getBootstrapProperties() throws IOException {
if (!bootstrapConfigFile.exists()) {
throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath());
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
index c0343c9f5e..7800d4b28a 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
@@ -61,7 +61,7 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
- File swapConfigFile = bootstrapFileProvider.getSwapFile();
+ File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
index e9a8eaff64..90f90201cd 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.util.LimitingInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,11 @@ public class MiNiFiListener {
private Listener listener;
private ServerSocket serverSocket;
- public int start(RunMiNiFi runner) throws IOException {
+ public int start(RunMiNiFi runner, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
- listener = new Listener(serverSocket, runner);
+ listener = new Listener(serverSocket, new BootstrapCodec(runner, bootstrapFileProvider, configurationChangeListener));
Thread listenThread = new Thread(listener);
listenThread.setName("MiNiFi listener");
listenThread.setDaemon(true);
@@ -64,19 +65,18 @@ public class MiNiFiListener {
private final ServerSocket serverSocket;
private final ExecutorService executor;
- private final RunMiNiFi runner;
private volatile boolean stopped = false;
+ private final BootstrapCodec codec;
- public Listener(ServerSocket serverSocket, RunMiNiFi runner) {
+ public Listener(ServerSocket serverSocket, BootstrapCodec bootstrapCodec) {
this.serverSocket = serverSocket;
+ this.codec = bootstrapCodec;
this.executor = Executors.newFixedThreadPool(2, runnable -> {
Thread t = Executors.defaultThreadFactory().newThread(runnable);
t.setDaemon(true);
t.setName("MiNiFi Bootstrap Command Listener");
return t;
});
-
- this.runner = runner;
}
public void stop() {
@@ -126,8 +126,7 @@ public class MiNiFiListener {
// which in turn may cause the Shutdown Hook to shutdown MiNiFi.
// So we will limit the amount of data to read to 4 KB
try (InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096)) {
- BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream());
- codec.communicate();
+ codec.communicate(limitingIn, socket.getOutputStream());
} catch (Exception t) {
LOGGER.error("Failed to communicate with MiNiFi due to exception: ", t);
} finally {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java
new file mode 100644
index 0000000000..a2fc043a8f
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.CONFIG_UPDATED_FILE_NAME;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+ private static final String FALLBACK_CONFIG_FILE_DIR = "./conf/";
+
+ private final Differentiator<ByteBuffer> differentiator;
+ private final RunMiNiFi runMiNiFi;
+ private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+ this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+ this.differentiator.initialize(runMiNiFi);
+ this.runMiNiFi = runMiNiFi;
+ this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ logger.info("Handling configuration update");
+ MiNiFiCommandState commandState = null;
+ try (FileInputStream configFile = new FileInputStream(getConfigFilePath().toFile())) {
+ ByteBuffer readOnlyNewConfig = ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+ IOUtils.toByteArray(configFile), runMiNiFi.getConfigFileReference().get().duplicate(), bootstrapFileProvider.getBootstrapProperties());
+ if (differentiator.isNew(readOnlyNewConfig)) {
+ miNiFiConfigurationChangeListener.handleChange(new ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+ } else {
+ logger.info("The given configuration does not contain any update. No operation required");
+ commandState = MiNiFiCommandState.NO_OPERATION;
+ }
+ } catch (Exception e) {
+ commandState = MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART;
+ logger.error("Could not handle configuration update", e);
+ }
+ return Optional.ofNullable(commandState);
+ }
+
+ private Path getConfigFilePath() {
+ return ofNullable(safeGetPropertiesFilePath())
+ .map(File::new)
+ .map(File::getParent)
+ .map(parentDir -> new File(parentDir + CONFIG_UPDATED_FILE_NAME))
+ .orElse(new File(FALLBACK_CONFIG_FILE_DIR + CONFIG_UPDATED_FILE_NAME)).toPath();
+ }
+
+ private String safeGetPropertiesFilePath() {
+ String propertyFile = null;
+ try {
+ propertyFile = bootstrapFileProvider.getBootstrapProperties().getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
+ } catch (IOException e) {
+ logger.error("Failed to get properties file path");
+ }
+ return propertyFile;
+ }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java
new file mode 100644
index 0000000000..4b3de7e89a
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.slf4j.Logger;
+
+public class UpdatePropertiesService {
+ private final RunMiNiFi runner;
+ private final Logger logger;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdatePropertiesService(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) {
+ this.runner = runner;
+ this.logger = logger;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ Optional<MiNiFiCommandState> commandState;
+ try {
+ File bootstrapConfigFile = BootstrapFileProvider.getBootstrapConfFile();
+
+ File bootstrapSwapConfigFile = bootstrapFileProvider.getBootstrapConfSwapFile();
+ logger.info("Persisting old bootstrap configuration to {}", bootstrapSwapConfigFile.getAbsolutePath());
+
+ try (FileInputStream configFileInputStream = new FileInputStream(bootstrapConfigFile)) {
+ Files.copy(configFileInputStream, bootstrapSwapConfigFile.toPath(), REPLACE_EXISTING);
+ }
+
+ Files.copy(bootstrapFileProvider.getBootstrapConfNewFile().toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+
+ // already from new
+ commandState = generateConfigfilesBasedOnNewProperties(bootstrapConfigFile, bootstrapSwapConfigFile, bootstrapFileProvider.getBootstrapProperties());
+ } catch (Exception e) {
+ commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART);
+ logger.error("Failed to load new bootstrap properties", e);
+ }
+ return commandState;
+ }
+
+ private Optional<MiNiFiCommandState> generateConfigfilesBasedOnNewProperties(File bootstrapConfigFile, File bootstrapSwapConfigFile, Properties bootstrapProperties)
+ throws IOException, ConfigurationChangeException {
+ Optional<MiNiFiCommandState> commandState = Optional.empty();
+ try {
+ ByteBuffer byteBuffer = generateConfigFiles(asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()),
+ bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+ runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+ restartInstance();
+ } catch (Exception e) {
+ commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART);
+ // reverting config file
+ try (FileInputStream swapConfigFileStream = new FileInputStream(bootstrapSwapConfigFile)) {
+ Files.copy(swapConfigFileStream, bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+ }
+ // read reverted properties
+ bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+ ByteBuffer byteBuffer = generateConfigFiles(
+ asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()), bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+ runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+
+ logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+ if (!bootstrapSwapConfigFile.delete()) {
+ logger.warn("The swap file ({}) failed to delete after a failed handling of a change. It should be cleaned up manually.", bootstrapSwapConfigFile);
+ }
+ }
+ return commandState;
+ }
+
+ private void restartInstance() throws IOException {
+ try {
+ runner.reload();
+ } catch (IOException e) {
+ throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+ }
+ }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 2251e6a84a..5b9d0f2a88 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -194,6 +194,12 @@ public final class ConfigTransformer {
return schema;
}
+ public static ByteArrayInputStream asByteArrayInputStream(ByteBuffer byteBuffer) {
+ byte[] config = new byte[byteBuffer.remaining()];
+ byteBuffer.get(config);
+ return new ByteArrayInputStream(config);
+ }
+
protected static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
try (FileOutputStream nifiProperties = new FileOutputStream(nifiPropertiesPath.toString())) {
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
index 366d70b9b7..81315879b8 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
@@ -30,9 +30,11 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verifyNoInteractions;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
@@ -72,6 +74,10 @@ class CommandRunnerFactoryTest {
private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
@Mock
private MiNiFiExecCommandProvider miNiFiExecCommandProvider;
+ @Mock
+ private ObjectMapper objectMapper;
+ @Mock
+ private ConfigurationChangeListener configurationChangeListener;
@InjectMocks
private CommandRunnerFactory commandRunnerFactory;
@@ -81,8 +87,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(START);
assertInstanceOf(StartRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -90,8 +95,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(RUN);
assertInstanceOf(StartRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -99,8 +103,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(STOP);
assertInstanceOf(StopRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -108,8 +111,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(ENV);
assertInstanceOf(EnvRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -117,8 +119,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(DUMP);
assertInstanceOf(DumpRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -126,8 +127,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(FLOWSTATUS);
assertInstanceOf(FlowStatusRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -135,8 +135,7 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(STATUS);
assertInstanceOf(StatusRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
@@ -144,12 +143,16 @@ class CommandRunnerFactoryTest {
CommandRunner runner = commandRunnerFactory.getRunner(RESTART);
assertInstanceOf(CompositeCommandRunner.class, runner);
- verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
- miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+ verify();
}
@Test
void testRunCommandShouldThrowIllegalArgumentExceptionInCaseOfUnknownCommand() {
assertThrows(IllegalArgumentException.class, () -> commandRunnerFactory.getRunner(UNKNOWN));
}
+
+ private void verify() {
+ verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+ miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider, objectMapper, configurationChangeListener);
+ }
}
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
index a1f9401c09..c12869d7e9 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.Request;
import org.apache.commons.io.FileUtils;
@@ -43,7 +42,6 @@ public class WholeConfigDifferentiatorTest {
public static ByteBuffer defaultConfigBuffer;
public static ByteBuffer newConfigBuffer;
- public static Properties properties = new Properties();
public static ConfigurationFileHolder configurationFileHolder;
public static Request dummyRequest;
@@ -68,7 +66,7 @@ public class WholeConfigDifferentiatorTest {
@Test
public void TestSameInputStream() throws IOException {
Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
FileInputStream fileInputStream = new FileInputStream(defaultConfigPath.toFile());
assertFalse(differentiator.isNew(fileInputStream));
@@ -77,7 +75,7 @@ public class WholeConfigDifferentiatorTest {
@Test
public void TestNewInputStream() throws IOException {
Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
FileInputStream fileInputStream = new FileInputStream(newConfigPath.toFile());
assertTrue(differentiator.isNew(fileInputStream));
@@ -88,7 +86,7 @@ public class WholeConfigDifferentiatorTest {
@Test
public void TestSameByteBuffer() throws IOException {
Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
assertFalse(differentiator.isNew(defaultConfigBuffer));
}
@@ -96,7 +94,7 @@ public class WholeConfigDifferentiatorTest {
@Test
public void TestNewByteBuffer() throws IOException {
Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
+ differentiator.initialize(configurationFileHolder);
assertTrue(differentiator.isNew(newConfigBuffer));
}
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
index ccbe6b40a7..0ebb6c68f9 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
@@ -29,36 +29,57 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+@ExtendWith(MockitoExtension.class)
class BootstrapCodecTest {
private static final int VALID_PORT = 1;
private static final String SECRET = "secret";
private static final String OK = "OK";
private static final String EMPTY_STRING = "";
+
+ @Mock
private RunMiNiFi runner;
+ @Mock
+ private BootstrapFileProvider bootstrapFileProvider;
+ @Mock
+ private ConfigurationChangeListener configurationChangeListener;
+ @Mock
+ private UpdateConfigurationService updateConfigurationService;
+ @Mock
+ private UpdatePropertiesService updatePropertiesService;
+
+ @InjectMocks
+ private BootstrapCodec bootstrapCodec;
@BeforeEach
- void setup() {
- runner = mock(RunMiNiFi.class);
+ void setup() throws IllegalAccessException, NoSuchFieldException {
+ mockFinal("updateConfigurationService", updateConfigurationService);
+ mockFinal("updatePropertiesService", updatePropertiesService);
}
@Test
void testCommunicateShouldThrowIOExceptionIfThereIsNoCommand() {
InputStream inputStream = new ByteArrayInputStream(new byte[0]);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@@ -68,9 +89,8 @@ class BootstrapCodecTest {
String unknown = "unknown";
InputStream inputStream = new ByteArrayInputStream(unknown.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@@ -80,9 +100,8 @@ class BootstrapCodecTest {
String command = "PORT " + VALID_PORT + " " + SECRET;
InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- bootstrapCodec.communicate();
+ bootstrapCodec.communicate(inputStream, outputStream);
verify(runner).setMiNiFiParameters(VALID_PORT, SECRET);
assertEquals(OK, outputStream.toString().trim());
@@ -93,9 +112,8 @@ class BootstrapCodecTest {
void testCommunicateShouldFailWhenReceivesPortCommand(String command) {
InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@@ -112,9 +130,8 @@ class BootstrapCodecTest {
void testCommunicateShouldFailIfStartedCommandHasOtherThanOneArg() {
InputStream inputStream = new ByteArrayInputStream("STARTED".getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@@ -123,9 +140,8 @@ class BootstrapCodecTest {
void testCommunicateShouldFailIfStartedCommandFirstArgIsNotBoolean() {
InputStream inputStream = new ByteArrayInputStream("STARTED yes".getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- assertThrows(IOException.class, bootstrapCodec::communicate);
+ assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@@ -136,11 +152,10 @@ class BootstrapCodecTest {
PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ConfigurationChangeCoordinator configurationChangeCoordinator = mock(ConfigurationChangeCoordinator.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
when(runner.getConfigurationChangeCoordinator()).thenReturn(configurationChangeCoordinator);
- bootstrapCodec.communicate();
+ bootstrapCodec.communicate(inputStream, outputStream);
assertEquals(OK, outputStream.toString().trim());
verify(runner, times(2)).getPeriodicStatusReporterManager();
@@ -157,10 +172,9 @@ class BootstrapCodecTest {
PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
- bootstrapCodec.communicate();
+ bootstrapCodec.communicate(inputStream, outputStream);
assertEquals(OK, outputStream.toString().trim());
verify(runner).getPeriodicStatusReporterManager();
@@ -172,13 +186,42 @@ class BootstrapCodecTest {
void testCommunicateShouldHandleReloadCommand() throws IOException {
InputStream inputStream = new ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
- PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
- when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
- bootstrapCodec.communicate();
+ bootstrapCodec.communicate(inputStream, outputStream);
+
+ assertEquals(OK, outputStream.toString().trim());
+ }
+
+ @Test
+ void testUpdateConfigurationCommandShouldHandleUpdateConfiguration() throws IOException {
+ InputStream inputStream = new ByteArrayInputStream("UPDATE_CONFIGURATION".getBytes(StandardCharsets.UTF_8));
+ C2Operation c2Operation = new C2Operation();
+ c2Operation.setIdentifier("id");
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ bootstrapCodec.communicate(inputStream, outputStream);
assertEquals(OK, outputStream.toString().trim());
+ verify(updateConfigurationService).handleUpdate();
+ }
+
+ @Test
+ void testUpdatePropertiesCommandShouldHandleUpdateProperties() throws IOException {
+ InputStream inputStream = new ByteArrayInputStream("UPDATE_PROPERTIES".getBytes(StandardCharsets.UTF_8));
+ C2Operation c2Operation = new C2Operation();
+ c2Operation.setIdentifier("id");
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ bootstrapCodec.communicate(inputStream, outputStream);
+
+ assertEquals(OK, outputStream.toString().trim());
+ verify(updatePropertiesService).handleUpdate();
+ }
+
+ private void mockFinal(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
+ Field updateConfigurationServiceField = BootstrapCodec.class.getDeclaredField(fieldName);
+ updateConfigurationServiceField.setAccessible(true);
+ updateConfigurationServiceField.set(bootstrapCodec, value);
}
}
\ No newline at end of file
diff --git a/minifi/minifi-commons/minifi-commons-api/pom.xml b/minifi/minifi-commons/minifi-commons-api/pom.xml
new file mode 100644
index 0000000000..aadd4dc251
--- /dev/null
+++ b/minifi/minifi-commons/minifi-commons-api/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-commons</artifactId>
+ <version>1.20.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>minifi-commons-api</artifactId>
+ <packaging>jar</packaging>
+
+</project>
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
similarity index 60%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
index 1f1cb19035..451060bb82 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
@@ -15,28 +15,11 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.minifi.commons.api;
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- ASSET;
-
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
+/**
+ * Response State for Commands coming from MiNiFi.
+ */
+public enum MiNiFiCommandState {
+ FULLY_APPLIED, NO_OPERATION, NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED_WITH_RESTART;
}
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
similarity index 60%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
index 1f1cb19035..1bbac63869 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
@@ -15,28 +15,9 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.minifi.commons.api;
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- ASSET;
-
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
+public interface MiNiFiConstants {
+ String BOOTSTRAP_UPDATED_FILE_NAME = "bootstrap-updated.conf";
+ String CONFIG_UPDATED_FILE_NAME = "config-updated.yml";
}
diff --git a/minifi/minifi-commons/pom.xml b/minifi/minifi-commons/pom.xml
index be41c74424..b6ef4b6c27 100644
--- a/minifi/minifi-commons/pom.xml
+++ b/minifi/minifi-commons/pom.xml
@@ -28,5 +28,6 @@ limitations under the License.
<modules>
<module>minifi-commons-schema</module>
<module>minifi-utils</module>
+ <module>minifi-commons-api</module>
</modules>
</project>
diff --git a/minifi/minifi-docker/pom.xml b/minifi/minifi-docker/pom.xml
index e8a8412302..10344dcbcb 100644
--- a/minifi/minifi-docker/pom.xml
+++ b/minifi/minifi-docker/pom.xml
@@ -97,6 +97,7 @@ limitations under the License.
</buildArgs>
<repository>apacheminifi</repository>
<tag>${minifi.version}</tag>
+ <tag>latest</tag>
</configuration>
</execution>
</executions>
diff --git a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
index 126a391634..2f43cfa1b7 100644
--- a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
+++ b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
@@ -126,15 +126,8 @@ c2.agent.heartbeat.period=5000
#(Optional) c2.rest.callTimeout=10 sec
#(Optional) c2.agent.identifier=123-456-789
c2.agent.class=agentClassName
-```
-3. Configure MiNiFi to recognize _config.yml_ changes
-```
-nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
-nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
-nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
-```
-4. Start MiNiFi
-5. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
+3. Start MiNiFi
+4. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
**Note:** Flow definitions are class based. Each class has one flow defined for it. As a result, all the agents belonging to the same class will get the flow at update.<br>
**Note:** Compression can be turned on for C2 requests by setting `c2.request.compression=gzip`. Compression is turned off by default when the parameter is omitted, or when `c2.request.compression=none` is given. It can be beneficial to turn compression on to prevent network saturation.
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
@@ -3,6 +3,6 @@
"pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
},
{
- "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+ "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
}
]
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
@@ -3,6 +3,6 @@
"pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
},
{
- "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+ "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
}
]
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
@@ -3,6 +3,6 @@
"pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
},
{
- "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+ "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
}
]
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
index 810c920f25..71b68792bb 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
@@ -26,6 +26,11 @@ limitations under the License.
<artifactId>minifi-framework-core</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-commons-api</artifactId>
+ <version>1.20.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-framework-api</artifactId>
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java
new file mode 100644
index 0000000000..755df830c2
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.BOOLEAN_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.LONG_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.NON_NEGATIVE_INTEGER_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.PORT_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.TIME_PERIOD_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.VALID;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum MiNiFiProperties {
+ JAVA("java", "java", false, false, VALID),
+ RUN_AS("run.as", null, false, false, VALID),
+ LIB_DIR("lib.dir", "./lib", false, false, VALID),
+ CONF_DIR("conf.dir", "./conf", false, false, VALID),
+ GRACEFUL_SHUTDOWN_SECOND("graceful.shutdown.seconds", "20", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+ NIFI_MINIFI_CONFIG("nifi.minifi.config", "./conf/config.yml", false, true, VALID),
+ NIFI_MINIFI_SECURITY_KEYSTORE("nifi.minifi.security.keystore", null, false, false, VALID),
+ NIFI_MINIFI_SECURITY_KEYSTORE_TYPE("nifi.minifi.security.keystoreType", null, false, false, VALID),
+ NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD("nifi.minifi.security.keystorePasswd", null, true, false, VALID),
+ NIFI_MINIFI_SECURITY_KEY_PASSWD("nifi.minifi.security.keyPasswd", null, true, false, VALID),
+ NIFI_MINIFI_SECURITY_TRUSTSTORE("nifi.minifi.security.truststore", null, false, false, VALID),
+ NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE("nifi.minifi.security.truststoreType", null, false, false, VALID),
+ NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD("nifi.minifi.security.truststorePasswd", null, true, false, VALID),
+ NIFI_MINIFI_SECURITY_SSL_PROTOCOL("nifi.minifi.security.ssl.protocol", null, false, false, VALID),
+ NIFI_MINIFI_SENSITIVE_PROPS_KEY("nifi.minifi.sensitive.props.key", null, true, false, VALID),
+ NIFI_MINIFI_SENSITIVE_PROPS_ALGORITHM("nifi.minifi.sensitive.props.algorithm", null, true, false, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT("nifi.minifi.provenance.reporting.comment", null, false, true, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY("nifi.minifi.provenance.reporting.scheduling.strategy", null, false, true, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD("nifi.minifi.provenance.reporting.scheduling.period", null, false, true, TIME_PERIOD_VALIDATOR),
+ NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL("nifi.minifi.provenance.reporting.destination.url", null, false, true, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME("nifi.minifi.provenance.reporting.input.port.name", null, false, true, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL("nifi.minifi.provenance.reporting.instance.url", null, false, true, VALID),
+ NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE("nifi.minifi.provenance.reporting.batch.size", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+ NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT("nifi.minifi.provenance.reporting.communications.timeout", null, false, true, TIME_PERIOD_VALIDATOR),
+ NIFI_MINIFI_FLOW_USE_PARENT_SSL("nifi.minifi.flow.use.parent.ssl", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_RECEIVE_HTTP_PORT("nifi.minifi.notifier.ingestors.receive.http.port", null, false, true, PORT_VALIDATOR),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_HOSTNAME("nifi.minifi.notifier.ingestors.pull.http.hostname", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PORT("nifi.minifi.notifier.ingestors.pull.http.port", null, false, true, PORT_VALIDATOR),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PATH("nifi.minifi.notifier.ingestors.pull.http.path", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_QUERY("nifi.minifi.notifier.ingestors.pull.http.query", null, false, true, VALID),
+ NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PERIOD_MS("nifi.minifi.notifier.ingestors.pull.http.period.ms", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+ NIFI_MINIFI_STATUS_REPORTER_COMPONENTS("nifi.minifi.status.reporter.components", null, false, true, VALID),
+ NIFI_MINIFI_STATUS_REPORTER_LOG_QUERY("nifi.minifi.status.reporter.log.query", null, false, true, VALID),
+ NIFI_MINIFI_STATUS_REPORTER_LOG_LEVEL("nifi.minifi.status.reporter.log.level", null, false, true, VALID),
+ NIFI_MINIFI_STATUS_REPORTER_LOG_PERIOD("nifi.minifi.status.reporter.log.period", null, false, true, VALID),
+ JAVA_ARG_1("java.arg.1", null, false, true, VALID),
+ JAVA_ARG_2("java.arg.2", null, false, true, VALID),
+ JAVA_ARG_3("java.arg.3", null, false, true, VALID),
+ JAVA_ARG_4("java.arg.4", null, false, true, VALID),
+ JAVA_ARG_5("java.arg.5", null, false, true, VALID),
+ JAVA_ARG_6("java.arg.6", null, false, true, VALID),
+ JAVA_ARG_7("java.arg.7", null, false, true, VALID),
+ JAVA_ARG_8("java.arg.8", null, false, true, VALID),
+ JAVA_ARG_9("java.arg.9", null, false, true, VALID),
+ JAVA_ARG_10("java.arg.10", null, false, true, VALID),
+ JAVA_ARG_11("java.arg.11", null, false, true, VALID),
+ JAVA_ARG_12("java.arg.12", null, false, true, VALID),
+ JAVA_ARG_13("java.arg.13", null, false, true, VALID),
+ JAVA_ARG_14("java.arg.14", null, false, true, VALID),
+ C2_ENABLE("c2.enable", "false", false, true, BOOLEAN_VALIDATOR),
+ C2_REST_URL("c2.rest.url", "", false, true, VALID),
+ C2_REST_URL_ACK("c2.rest.url.ack", "", false, true, VALID),
+ C2_REST_CONNECTION_TIMEOUT("c2.rest.connectionTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
+ C2_REST_READ_TIMEOUT("c2.rest.readTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
+ C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR),
+ C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+ C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true, TIME_PERIOD_VALIDATOR),
+ C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false, true, LONG_VALIDATOR),
+ C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+ C2_CONFIG_DIRECTORY("c2.config.directory", "./conf", false, true, VALID),
+ C2_RUNTIME_MANIFEST_IDENTIFIER("c2.runtime.manifest.identifier", "", false, true, VALID),
+ C2_RUNTIME_TYPE("c2.runtime.type", "", false, true, VALID),
+ C2_AGENT_IDENTIFIER("c2.agent.identifier", null, false, true, VALID),
+ C2_FULL_HEARTBEAT("c2.full.heartbeat", "true", false, true, BOOLEAN_VALIDATOR),
+ C2_SECURITY_TRUSTSTORE_LOCATION("c2.security.truststore.location", "", false, false, VALID),
+ C2_SECURITY_TRUSTSTORE_PASSWORD("c2.security.truststore.password", "", true, false, VALID),
+ C2_SECURITY_TRUSTSTORE_TYPE("c2.security.truststore.type", "JKS", false, false, VALID),
+ C2_SECURITY_KEYSTORE_LOCATION("c2.security.keystore.location", "", false, false, VALID),
+ C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true, false, VALID),
+ C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID),
+ C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
+ C2_ASSET_DIRECTORY("c2.asset.directory", "./asset", false, true, VALID);
+
+ public static final LinkedHashMap<String, MiNiFiProperties> PROPERTIES_BY_KEY = Arrays.stream(MiNiFiProperties.values())
+ .sorted()
+ .collect(Collectors.toMap(MiNiFiProperties::getKey, Function.identity(), (x, y) -> y, LinkedHashMap::new));
+
+ private final String key;
+ private final String defaultValue;
+ private final boolean sensitive;
+ private final boolean modifiable;
+ private final ValidatorNames validator;
+
+ MiNiFiProperties(String key, String defaultValue, boolean sensitive, boolean modifiable, ValidatorNames validator) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.sensitive = sensitive;
+ this.modifiable = modifiable;
+ this.validator = validator;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public boolean isSensitive() {
+ return sensitive;
+ }
+
+ public boolean isModifiable() {
+ return modifiable;
+ }
+
+ public ValidatorNames getValidator() {
+ return validator;
+ }
+
+ public enum ValidatorNames {
+ VALID, BOOLEAN_VALIDATOR, LONG_VALIDATOR, NON_NEGATIVE_INTEGER_VALIDATOR, TIME_PERIOD_VALIDATOR, PORT_VALIDATOR
+ }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
deleted file mode 100644
index 71282c0ed5..0000000000
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.c2;
-
-import java.util.concurrent.TimeUnit;
-
-public class C2NiFiProperties {
-
- public static final String C2_PREFIX = "c2.";
-
- public static final String C2_ENABLE_KEY = C2_PREFIX + "enable";
- public static final String C2_AGENT_PROTOCOL_KEY = C2_PREFIX + "agent.protocol.class";
- public static final String C2_COAP_HOST_KEY = C2_PREFIX + "agent.coap.host";
- public static final String C2_COAP_PORT_KEY = C2_PREFIX + "agent.coap.port";
- public static final String C2_CONFIG_DIRECTORY_KEY = C2_PREFIX + "config.directory";
- public static final String C2_RUNTIME_MANIFEST_IDENTIFIER_KEY = C2_PREFIX + "runtime.manifest.identifier";
- public static final String C2_RUNTIME_TYPE_KEY = C2_PREFIX + "runtime.type";
- public static final String C2_REST_URL_KEY = C2_PREFIX + "rest.url";
- public static final String C2_REST_URL_ACK_KEY = C2_PREFIX + "rest.url.ack";
- public static final String C2_ROOT_CLASSES_KEY = C2_PREFIX + "root.classes";
- public static final String C2_AGENT_HEARTBEAT_PERIOD_KEY = C2_PREFIX + "agent.heartbeat.period";
- public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout";
- public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout";
- public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout";
- public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
- public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier";
- public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat";
- public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + "request.compression";
- public static final String C2_ASSET_DIRECTORY_KEY = C2_PREFIX + "asset.directory";
-
- public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions";
- public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
- public static final String C2_METRICS_METRICS_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics";
- public static final String C2_METRICS_METRICS_TYPED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.name";
- public static final String C2_METRICS_METRICS_QUEUED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.name";
- public static final String C2_METRICS_METRICS_QUEUE_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.classes";
- public static final String C2_METRICS_METRICS_TYPED_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.classes";
- public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.name";
- public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.classes";
-
- /* C2 Client Security Properties */
- private static final String C2_REST_SECURITY_BASE_KEY = C2_PREFIX + "security";
- public static final String TRUSTSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.location";
- public static final String TRUSTSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.password";
- public static final String TRUSTSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.type";
- public static final String KEYSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.location";
- public static final String KEYSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.password";
- public static final String KEYSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.type";
-
- // Defaults
- // Heartbeat period of 1 second
- public static final long C2_AGENT_DEFAULT_HEARTBEAT_PERIOD = TimeUnit.SECONDS.toMillis(1);
-
- // Connection timeout of 5 seconds
- public static final String C2_DEFAULT_CONNECTION_TIMEOUT = "5 sec";
- // Read timeout of 5 seconds
- public static final String C2_DEFAULT_READ_TIMEOUT = "5 sec";
- // Call timeout of 10 seconds
- public static final String C2_DEFAULT_CALL_TIMEOUT = "10 sec";
-
- // C2 request compression is turned off by default
- public static final String C2_REQUEST_COMPRESSION= "none";
-
- public static final String C2_ASSET_DIRECTORY = "./asset";
-}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index e280386e90..d56e0d7ce3 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -18,19 +18,48 @@
package org.apache.nifi.minifi.c2;
import static java.util.Optional.ofNullable;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_CLASS;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_IDENTIFIER;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ASSET_DIRECTORY;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_CONFIG_DIRECTORY;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_FULL_HEARTBEAT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REQUEST_COMPRESSION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CALL_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CONNECTION_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_READ_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL_ACK;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_RUNTIME_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_PASSWORD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.CONF_DIR;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.CONFIG_UPDATED_FILE_NAME;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.http.C2HttpClient;
import org.apache.nifi.c2.client.service.C2ClientService;
@@ -38,19 +67,24 @@ import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.client.service.ManifestHashProvider;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
-import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
-import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
+import org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHandler;
import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.controller.FlowController;
@@ -62,6 +96,11 @@ import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser;
import org.apache.nifi.manifest.RuntimeManifestService;
import org.apache.nifi.manifest.StandardRuntimeManifestService;
+import org.apache.nifi.minifi.c2.command.PropertiesPersister;
+import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
+import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
+import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
import org.apache.nifi.nar.ExtensionManagerHolder;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -70,28 +109,39 @@ import org.slf4j.LoggerFactory;
public class C2NifiClientService {
- private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class);
-
- private static final String DEFAULT_CONF_DIR = "./conf";
- private static final String TARGET_CONFIG_FILE = "/config-new.yml";
+ private static final Logger LOGGER = LoggerFactory.getLogger(C2NifiClientService.class);
+ private static final String TARGET_CONFIG_FILE = "/" + CONFIG_UPDATED_FILE_NAME;
private static final String ROOT_GROUP_ID = "root";
private static final Long INITIAL_DELAY = 10000L;
private static final Integer TERMINATION_WAIT = 5000;
+ private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60;
+ private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
+ private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000;
+ private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = getOperationStateMap();
+ private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000;
private final C2ClientService c2ClientService;
private final FlowController flowController;
private final String propertiesDir;
- private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ private final ScheduledThreadPoolExecutor heartbeatExecutorService = new ScheduledThreadPoolExecutor(1);
+ private final ScheduledThreadPoolExecutor bootstrapAcknowledgeExecutorService = new ScheduledThreadPoolExecutor(1);
private final ExtensionManifestParser extensionManifestParser = new JAXBExtensionManifestParser();
private final RuntimeManifestService runtimeManifestService;
private final SupportedOperationsProvider supportedOperationsProvider;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final BootstrapCommunicator bootstrapCommunicator;
+ private volatile boolean ackReceived = false;
+ private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+ private final PropertiesPersister propertiesPersister;
+ private final ObjectMapper objectMapper;
+
private final long heartbeatPeriod;
- public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
+ public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator) {
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
this.propertiesDir = niFiProperties.getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
@@ -109,58 +159,157 @@ public class C2NifiClientService {
OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory());
+ objectMapper = new ObjectMapper();
updateAssetCommandHelper.createAssetDirectory();
- C2OperationService c2OperationService = new C2OperationService(Arrays.asList(
+ this.bootstrapCommunicator = bootstrapCommunicator;
+ requestedOperationDAO = new FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), objectMapper);
+ String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file");
+ updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+ propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
+ C2OperationHandlerProvider c2OperationHandlerProvider = new C2OperationHandlerProvider(Arrays.asList(
new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider),
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider,
transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
- updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction)
+ updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction),
+ new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties)
));
- this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationService);
- this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers());
+ this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
+ this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
+ bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, (params, output) -> acknowledgeHandler(params));
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
return new C2ClientConfig.Builder()
- .agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, ""))
- .agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY))
- .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true")))
- .heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY,
- String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD))))
- .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT,
- C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS))
- .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT,
- C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS))
- .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
- C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
- .c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
- .c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
- .c2AssetDirectory(properties.getProperty(C2NiFiProperties.C2_ASSET_DIRECTORY_KEY, C2NiFiProperties.C2_ASSET_DIRECTORY))
- .confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
- .runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
- .runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))
- .c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, ""))
- .truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, ""))
- .truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, ""))
- .truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS"))
- .keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, ""))
- .keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, ""))
- .keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS"))
+ .agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(), C2_AGENT_CLASS.getDefaultValue()))
+ .agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey()))
+ .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue())))
+ .heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
+ C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
+ .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(),
+ C2_REST_CONNECTION_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+ .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(),
+ C2_REST_READ_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+ .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(),
+ C2_REST_CALL_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+ .maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
+ .keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
+ C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS))
+ .c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue()))
+ .c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue()))
+ .c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue()))
+ .confDirectory(properties.getProperty(C2_CONFIG_DIRECTORY.getKey(), C2_CONFIG_DIRECTORY.getDefaultValue()))
+ .runtimeManifestIdentifier(properties.getProperty(C2_RUNTIME_MANIFEST_IDENTIFIER.getKey(), C2_RUNTIME_MANIFEST_IDENTIFIER.getDefaultValue()))
+ .runtimeType(properties.getProperty(C2_RUNTIME_TYPE.getKey(), C2_RUNTIME_TYPE.getDefaultValue()))
+ .c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue()))
+ .truststoreFilename(properties.getProperty(C2_SECURITY_TRUSTSTORE_LOCATION.getKey(), C2_SECURITY_TRUSTSTORE_LOCATION.getDefaultValue()))
+ .truststorePassword(properties.getProperty(C2_SECURITY_TRUSTSTORE_PASSWORD.getKey(), C2_SECURITY_TRUSTSTORE_PASSWORD.getDefaultValue()))
+ .truststoreType(properties.getProperty(C2_SECURITY_TRUSTSTORE_TYPE.getKey(), C2_SECURITY_TRUSTSTORE_TYPE.getDefaultValue()))
+ .keystoreFilename(properties.getProperty(C2_SECURITY_KEYSTORE_LOCATION.getKey(), C2_SECURITY_KEYSTORE_LOCATION.getDefaultValue()))
+ .keystorePassword(properties.getProperty(C2_SECURITY_KEYSTORE_PASSWORD.getKey(), C2_SECURITY_KEYSTORE_PASSWORD.getDefaultValue()))
+ .keystoreType(properties.getProperty(C2_SECURITY_KEYSTORE_TYPE.getKey(), C2_SECURITY_KEYSTORE_TYPE.getDefaultValue()))
.build();
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations();
+ heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ // need to be synchronized to prevent parallel run coming from acknowledgeHandler/ackTimeoutTask
+ private synchronized void handleOngoingOperations() {
+ Optional<OperationQueue> operationQueue = requestedOperationDAO.load();
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+ c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ try {
+ Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+ }
+ currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask(c2Operation);
+ String command = Optional.ofNullable(c2Operation.getOperand())
+ .map(operand -> c2Operation.getOperation().name() + "_" + operand.name())
+ .orElse(c2Operation.getOperation().name());
+ bootstrapCommunicator.sendCommand(command);
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
+ if (!ackReceived) {
+ LOGGER.info("Operation requiring restart is failed, and no restart/acknowledge is happened after {} seconds for {}. Handling remaining operations.",
+ MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation);
+ handleOngoingOperations();
+ }
+ }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private void acknowledgeHandler(String[] params) {
+ LOGGER.info("Received acknowledge message from bootstrap process");
+ if (params.length < 1) {
+ LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+ return;
+ }
+
+ Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.load();
+ ackReceived = true;
+ if (optionalOperationQueue.isPresent()) {
+ OperationQueue operationQueue = optionalOperationQueue.get();
+ C2Operation c2Operation = operationQueue.getCurrentOperation();
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(c2Operation.getIdentifier());
+ C2OperationState c2OperationState = new C2OperationState();
+ MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]);
+ OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState);
+ c2OperationState.setState(state);
+ c2OperationAck.setOperationState(c2OperationState);
+ c2ClientService.sendAcknowledge(c2OperationAck);
+ if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState || MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) {
+ LOGGER.debug("No restart happened because of an error / the app was already in the desired state");
+ handleOngoingOperations();
+ }
+ } else {
+ LOGGER.error("Can not send acknowledge due to empty Operation Queue");
+ }
}
public void stop() {
try {
- scheduledExecutorService.shutdown();
- scheduledExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS);
+ heartbeatExecutorService.shutdown();
+ heartbeatExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
- logger.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService");
+ LOGGER.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService");
}
}
@@ -215,14 +364,16 @@ public class C2NifiClientService {
}
private boolean updateFlowContent(byte[] updateContent) {
- logger.debug("Update content: \n{}", new String(updateContent, StandardCharsets.UTF_8));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Update content: \n{}", new String(updateContent, StandardCharsets.UTF_8));
+ }
Path path = getTargetConfigFile().toPath();
try {
Files.write(getTargetConfigFile().toPath(), updateContent);
- logger.info("Updated configuration was written to: {}", path);
+ LOGGER.info("Updated configuration was written to: {}", path);
return true;
} catch (IOException e) {
- logger.error("Configuration update failed. File creation was not successful targeting: {}", path, e);
+ LOGGER.error("Configuration update failed. File creation was not successful targeting: {}", path, e);
return false;
}
}
@@ -232,6 +383,15 @@ public class C2NifiClientService {
.map(File::new)
.map(File::getParent)
.map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE))
- .orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE));
+ .orElse(new File(CONF_DIR.getDefaultValue() + TARGET_CONFIG_FILE));
+ }
+
+ private static Map<MiNiFiCommandState, OperationState> getOperationStateMap() {
+ Map<MiNiFiCommandState, OperationState> operationStateMapping = new HashMap<>();
+ operationStateMapping.put(MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED);
+ operationStateMapping.put(MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION);
+ operationStateMapping.put(MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, OperationState.NOT_APPLIED);
+ operationStateMapping.put(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, OperationState.NOT_APPLIED);
+ return Collections.unmodifiableMap(operationStateMapping);
}
}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
new file mode 100644
index 0000000000..80b1418c95
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+ protected static final String REQUESTED_OPERATIONS_FILE_NAME = "requestedOperations.data";
+
+ private final ObjectMapper objectMapper;
+ private final File requestedOperationsFile;
+
+ public FileBasedRequestedOperationDAO(String runDir, ObjectMapper objectMapper) {
+ this.requestedOperationsFile = new File(runDir, REQUESTED_OPERATIONS_FILE_NAME);
+ this.objectMapper = objectMapper;
+ }
+
+ public void save(OperationQueue operationQueue) {
+ LOGGER.info("Saving C2 operations to file");
+ LOGGER.debug("C2 Operation Queue: {}", operationQueue);
+ try {
+ objectMapper.writeValue(requestedOperationsFile, operationQueue);
+ } catch (Exception e) {
+ LOGGER.error("Failed to save requested c2 operations", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Optional<OperationQueue> load() {
+ LOGGER.info("Reading queued c2 operations from file");
+ if (requestedOperationsFile.exists()) {
+ try {
+ OperationQueue operationQueue = objectMapper.readValue(requestedOperationsFile, OperationQueue.class);
+ LOGGER.debug("Queued operations: {}", operationQueue);
+ return Optional.of(operationQueue);
+ } catch (Exception e) {
+ LOGGER.error("Failed to read queued operations file", e);
+ }
+ } else {
+ LOGGER.info("There is no queued c2 operation");
+ }
+ return Optional.empty();
+ }
+
+ public void cleanup() {
+ if (requestedOperationsFile.exists() && !requestedOperationsFile.delete()) {
+ LOGGER.error("Failed to delete requested operations file {}, it should be deleted manually", requestedOperationsFile);
+ }
+ }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java
new file mode 100644
index 0000000000..173e3fc52d
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.expression.ExpressionLanguageCompiler;
+
+public class AgentPropertyValidationContext implements ValidationContext {
+
+ @Override
+ public boolean isExpressionLanguageSupported(String propertyName) {
+ return false;
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, String> getAllProperties() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ValidationContext getControllerServiceValidationContext(ControllerService controllerService) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getAnnotationData() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isValidationRequired(ControllerService service) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isExpressionLanguagePresent(String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getProcessGroupIdentifier() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<String> getReferencedParameters(String propertyName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isParameterDefined(String parameterName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isParameterSet(String parameterName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDependencySatisfied(PropertyDescriptor propertyDescriptor, Function<String, PropertyDescriptor> propertyDescriptorLookup) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PropertyValue getProperty(PropertyDescriptor descriptor) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java
new file mode 100644
index 0000000000..3e68572b9b
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+ private static final String VALID = "VALID";
+ private static final String EQUALS_SIGN = "=";
+ private static final String HASHMARK_SIGN = "#";
+
+ private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+ private final AgentPropertyValidationContext validationContext;
+ private final File bootstrapFile;
+ private final File bootstrapNewFile;
+
+ public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+ this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+ this.validationContext = new AgentPropertyValidationContext();
+ this.bootstrapFile = new File(bootstrapConfigFileLocation);
+ this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/" + BOOTSTRAP_UPDATED_FILE_NAME);
+ }
+
+ public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+ int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+ if (propertyCountToUpdate == 0) {
+ return false;
+ }
+ Set<String> propertiesToUpdateKeys = new HashSet<>(propertiesToUpdate.keySet());
+
+ Set<String> updatedProperties = new HashSet<>();
+ try (BufferedReader reader = new BufferedReader(new FileReader(bootstrapFile));
+ BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bootstrapNewFile, false))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ for (String key : propertiesToUpdateKeys) {
+ String prefix = key + EQUALS_SIGN;
+ if (line.startsWith(prefix) || line.startsWith(HASHMARK_SIGN + prefix)) {
+ line = prefix + propertiesToUpdate.get(key);
+ updatedProperties.add(key);
+ }
+ }
+ bufferedWriter.write(line + System.lineSeparator());
+ }
+
+ // add new properties which has no values before
+ propertiesToUpdateKeys.removeAll(updatedProperties);
+ for (String key : propertiesToUpdateKeys) {
+ bufferedWriter.write(key + EQUALS_SIGN + propertiesToUpdate.get(key) + System.lineSeparator());
+ }
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return true;
+ }
+
+ private int validateProperties(Map<String, String> propertiesToUpdate) {
+ Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+ Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
+ int propertyCountToUpdate = 0;
+ List<String> validationErrors = new ArrayList<>();
+ for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+ UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
+ if (updatableProperty == null) {
+ validationErrors.add(String.format("You can not update the {} property through C2 protocol", entry.getKey()));
+ continue;
+ }
+ if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
+ if (!getValidator(updatableProperty.getValidator())
+ .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
+ .map(ValidationResult::isValid)
+ .orElse(true)) {
+ validationErrors.add(String.format("Invalid value for %s", entry.getKey()));
+ continue;
+ }
+ propertyCountToUpdate++;
+ }
+ }
+ if (!validationErrors.isEmpty()) {
+ throw new IllegalArgumentException("The following validation errors happened during property update:\\n" + String.join("\\n", validationErrors));
+ }
+ return propertyCountToUpdate;
+ }
+
+ private Optional<Validator> getValidator(String validatorName) {
+ try {
+ Field validatorField = StandardValidators.class.getField(validatorName);
+ return Optional.of((Validator) validatorField.get(null));
+ } catch (NoSuchFieldException e) {
+ if (!VALID.equals(validatorName)) {
+ LOGGER.warn("No validator present: {}", validatorName);
+ }
+ } catch (IllegalAccessException e) {
+ LOGGER.error("Illegal access of {}", validatorName);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java
new file mode 100644
index 0000000000..7850efb4db
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class UpdatableProperty implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String propertyName;
+ private final String propertyValue;
+ private final String validator;
+
+ public UpdatableProperty(String propertyName, String propertyValue, String validator) {
+ this.propertyName = propertyName;
+ this.propertyValue = propertyValue;
+ this.validator = validator;
+ }
+
+ public String getPropertyName() {
+ return propertyName;
+ }
+
+ public String getPropertyValue() {
+ return propertyValue;
+ }
+
+ public String getValidator() {
+ return validator;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdatableProperty that = (UpdatableProperty) o;
+ return Objects.equals(propertyName, that.propertyName) && Objects.equals(propertyValue, that.propertyValue) && Objects.equals(validator,
+ that.validator);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(propertyName, propertyValue, validator);
+ }
+
+ @Override
+ public String toString() {
+ return "UpdatableProperty{" +
+ "propertyName='" + propertyName + '\'' +
+ ", propertyValue='" + propertyValue + '\'' +
+ ", validator='" + validator + '\'' +
+ '}';
+ }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java
new file mode 100644
index 0000000000..7faece8daf
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesPropertyProvider implements OperandPropertiesProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesPropertyProvider.class);
+ protected static final String AVAILABLE_PROPERTIES = "availableProperties";
+
+ private final String bootstrapConfigFileLocation;
+
+ public UpdatePropertiesPropertyProvider(String bootstrapConfigFileLocation) {
+ this.bootstrapConfigFileLocation = bootstrapConfigFileLocation;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, String> bootstrapProperties = getBootstrapProperties();
+
+ LinkedHashSet<UpdatableProperty> updatableProperties = PROPERTIES_BY_KEY.values()
+ .stream()
+ .filter(property -> !property.isSensitive())
+ .filter(MiNiFiProperties::isModifiable)
+ .map(property -> new UpdatableProperty(property.getKey(), bootstrapProperties.get(property.getKey()), property.getValidator().name()))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+
+ return Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.unmodifiableSet(updatableProperties));
+ }
+
+ private Map<String, String> getBootstrapProperties() {
+ Properties props = new Properties();
+
+ File bootstrapFile = new File(bootstrapConfigFileLocation);
+ try (FileInputStream fis = new FileInputStream(bootstrapFile)) {
+ props.load(fis);
+ } catch (FileNotFoundException e) {
+ LOGGER.error("The bootstrap configuration file " + bootstrapConfigFileLocation + " doesn't exists", e);
+ } catch (IOException e) {
+ LOGGER.error("Failed to load properties from " + bootstrapConfigFileLocation, e);
+ }
+ return props.entrySet().stream()
+ .collect(Collectors.toMap(entry -> (String) entry.getKey(), entry -> (String) entry.getValue()));
+ }
+
+
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
new file mode 100644
index 0000000000..962e98b2e9
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+ @Mock
+ private ObjectMapper objectMapper;
+
+ @TempDir
+ File tmpDir;
+
+ private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+ @BeforeEach
+ void setup() {
+ fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+ }
+
+ @Test
+ void shouldSaveRequestedOperationsToFile() throws IOException {
+ OperationQueue operationQueue = getOperationQueue();
+ fileBasedRequestedOperationDAO.save(operationQueue);
+
+ verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+ }
+
+ @Test
+ void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+ doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+ assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+ }
+
+ @Test
+ void shouldGetReturnEmptyWhenFileDoesntExists() {
+ assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+ }
+
+ @Test
+ void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+ new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+ doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+ assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+ }
+
+ @Test
+ void shouldGetRequestedOperations() throws IOException {
+ new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+ OperationQueue operationQueue = getOperationQueue();
+ when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+ assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.load());
+ }
+
+ private OperationQueue getOperationQueue() {
+ C2Operation c2Operation = new C2Operation();
+ c2Operation.setIdentifier("id");
+ c2Operation.setOperation(OperationType.TRANSFER);
+ c2Operation.setOperand(OperandType.DEBUG);
+ c2Operation.setArgs(Collections.singletonMap("key", "value"));
+
+ C2Operation currentOperation = new C2Operation();
+ currentOperation.setIdentifier("id2");
+
+ return new OperationQueue(currentOperation, Collections.singletonList(c2Operation));
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java
new file mode 100644
index 0000000000..e1d68df805
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ENABLE;
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PropertiesPersisterTest {
+
+ private static final String EXTRA_PROPERTY_KEY = "anExtraPropertyWhichShouldNotBeModified";
+ private static final String EXTRA_PROPERTY_VALUE = "propertyValue";
+ private static final String FALSE = "false";
+ private static final String TRUE = "true";
+ private static final String BOOLEAN_VALIDATOR = "BOOLEAN_VALIDATOR";
+ private static final String UNKNOWN = "unknown";
+ private static final String VALID = "VALID";
+
+ @Mock
+ private UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+ @TempDir
+ private File tempDir;
+
+ private PropertiesPersister propertiesPersister;
+ private String bootstrapConfigFileLocation;
+ private String bootstrapNewConfigFileLocation;
+
+ @BeforeEach
+ void setup() {
+ bootstrapConfigFileLocation = tempDir.getAbsolutePath() + "/bootstrap.conf";
+ bootstrapNewConfigFileLocation = tempDir.getAbsolutePath() + "/" + BOOTSTRAP_UPDATED_FILE_NAME;
+ propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
+ }
+
+ @Test
+ void shouldPersistPropertiesThrowIllegalArgumentExceptionIfParameterContainsUnknownProperty() {
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty("propertyName",
+ EXTRA_PROPERTY_VALUE, VALID))));
+ assertThrows(IllegalArgumentException.class, () -> propertiesPersister.persistProperties(Collections.singletonMap(UNKNOWN, UNKNOWN)));
+ }
+
+ @Test
+ void shouldPersistPropertiesThrowIllegalArgumentExceptionIfParameterContainsInvalidPropertyValue() {
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(), null,
+ BOOLEAN_VALIDATOR))));
+ assertThrows(IllegalArgumentException.class, () ->
+ propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), UNKNOWN)));
+ }
+
+ @Test
+ void shouldPersistPropertiesReturnFalseIfThereIsNoNewPropertyValueInParameter() {
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+ TRUE, BOOLEAN_VALIDATOR))));
+
+ assertFalse(propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), TRUE)));
+ }
+
+ @Test
+ void shouldAddNewLinesForPropertiesWhichDidNotExistsBeforeInBootstrapConf() throws IOException {
+ new File(bootstrapConfigFileLocation).createNewFile();
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+ TRUE, BOOLEAN_VALIDATOR))));
+
+ propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+ Properties properties = readUpdatedProperties();
+ assertEquals(1, properties.stringPropertyNames().size());
+ assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+ }
+
+ @Test
+ void shouldModifyPropertiesForPreviouslyCommentedLines() {
+ writeBootstrapFile("#" + C2_ENABLE.getKey() + "=" + TRUE);
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+ TRUE, BOOLEAN_VALIDATOR))));
+
+ propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+ Properties properties = readUpdatedProperties();
+ assertEquals(2, properties.stringPropertyNames().size());
+ assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+ assertEquals(EXTRA_PROPERTY_VALUE, properties.getProperty(EXTRA_PROPERTY_KEY));
+ }
+
+ @Test
+ void shouldModifyProperties() {
+ writeBootstrapFile(C2_ENABLE.getKey() + "=" + TRUE);
+ when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+ TRUE, BOOLEAN_VALIDATOR))));
+
+ propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+ Properties properties = readUpdatedProperties();
+ assertEquals(2, properties.stringPropertyNames().size());
+ assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+ assertEquals(EXTRA_PROPERTY_VALUE, properties.getProperty(EXTRA_PROPERTY_KEY));
+ }
+
+ private void writeBootstrapFile(String property) {
+ try(BufferedWriter writer = new BufferedWriter(new FileWriter(bootstrapConfigFileLocation))) {
+ writer.write(property + System.lineSeparator());
+ writer.write(EXTRA_PROPERTY_KEY + "=" + EXTRA_PROPERTY_VALUE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Properties readUpdatedProperties() {
+ Properties props = new Properties();
+ try (FileInputStream fis = new FileInputStream(bootstrapNewConfigFileLocation)) {
+ props.load(fis);
+ } catch (Exception e) {
+ fail("Failed to read bootstrap file");
+ }
+ return props;
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java
new file mode 100644
index 0000000000..6923e29734
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.GRACEFUL_SHUTDOWN_SECOND;
+import static org.apache.nifi.minifi.MiNiFiProperties.JAVA;
+import static org.apache.nifi.minifi.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class UpdatePropertiesPropertyProviderTest {
+
+ @TempDir
+ private File tmpDir;
+
+ private UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+ private String bootstrapConfigFileLocation;
+
+ @BeforeEach
+ void setup() {
+ bootstrapConfigFileLocation = tmpDir + "/bootstrap.conf";
+ updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+ }
+
+ @Test
+ void shouldReturnModifiableNonSensitivePropertiesWithValues() throws IOException {
+ Properties props = new Properties();
+ props.setProperty(JAVA.getKey(), "java");
+ props.setProperty(GRACEFUL_SHUTDOWN_SECOND.getKey(), "20");
+ props.setProperty(NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(), "truststore");
+
+ try (FileOutputStream fos = new FileOutputStream(bootstrapConfigFileLocation)) {
+ props.store(fos, null);
+ }
+
+ Map<String, Object> result = updatePropertiesPropertyProvider.getProperties();
+
+ LinkedHashSet<UpdatableProperty> expected = getUpdatableProperties(props);
+
+ assertEquals(Collections.singletonMap(AVAILABLE_PROPERTIES, expected), result);
+ }
+
+ @Test
+ void shouldGetReturnListWithEmptyValuesInCaseOfFileNotFoundException() {
+ Map<String, Object> properties = updatePropertiesPropertyProvider.getProperties();
+
+ assertEquals(Collections.singletonMap(AVAILABLE_PROPERTIES, getUpdatableProperties(new Properties())), properties);
+ }
+
+ private static LinkedHashSet<UpdatableProperty> getUpdatableProperties(Properties props) {
+ return PROPERTIES_BY_KEY.values()
+ .stream()
+ .filter(property -> !property.isSensitive())
+ .filter(MiNiFiProperties::isModifiable)
+ .map(property -> new UpdatableProperty(property.getKey(), (String) props.get(property.getKey()), property.getValidator().name()))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index b1925b6bad..57653e7773 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -161,7 +161,3 @@ java.arg.14=-Djava.awt.headless=true
#c2.security.keystore.password=
#c2.security.keystore.type=JKS
#c2.request.compression=none
-# The following ingestor configuration needs to be enabled in order to apply configuration updates coming from C2 server
-#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
-#nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
-#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
index 16aeba7871..62c0f00c17 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.nifi.headless.HeadlessNiFiServer;
import org.apache.nifi.minifi.bootstrap.BootstrapListener;
-import org.apache.nifi.minifi.c2.C2NiFiProperties;
import org.apache.nifi.minifi.c2.C2NifiClientService;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.minifi.status.StatusConfigReporter;
@@ -55,8 +54,8 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
initBootstrapListener();
initC2();
-
sendStartedStatus();
+ startHeartbeat();
}
@Override
@@ -79,16 +78,21 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
}
private void initC2() {
- if (Boolean.parseBoolean(props.getProperty(C2NiFiProperties.C2_ENABLE_KEY, "false"))) {
+ if (Boolean.parseBoolean(props.getProperty(MiNiFiProperties.C2_ENABLE.getKey(), MiNiFiProperties.C2_ENABLE.getDefaultValue()))) {
logger.info("C2 enabled, creating a C2 client instance");
- c2NifiClientService = new C2NifiClientService(props, flowController);
- c2NifiClientService.start();
+ c2NifiClientService = new C2NifiClientService(props, flowController, bootstrapListener);
} else {
- logger.debug("C2 Property [{}] missing or disabled: C2 client not created", C2NiFiProperties.C2_ENABLE_KEY);
+ logger.debug("C2 Property [{}] missing or disabled: C2 client not created", MiNiFiProperties.C2_ENABLE.getKey());
c2NifiClientService = null;
}
}
+ private void startHeartbeat() {
+ if (c2NifiClientService != null) {
+ c2NifiClientService.start();
+ }
+ }
+
private void initBootstrapListener() {
String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
@@ -101,7 +105,9 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
- } catch (NumberFormatException | IOException nfe) {
+ } catch(IOException e){
+ throw new UncheckedIOException("Failed to start MiNiFi because of Bootstrap listener initialization error", e);
+ } catch (NumberFormatException e) {
throw new RuntimeException("Failed to start MiNiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {