You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/30 09:14:32 UTC

[GitHub] [nifi] ferencerdei opened a new pull request, #6733: NIFI-10895 Update properties command for MiNiFi C2

ferencerdei opened a new pull request, #6733:
URL: https://github.com/apache/nifi/pull/6733

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   This change makes it possible to update bootstrap properties through the C2 protocol.
   
   [NIFI-10895](https://issues.apache.org/jira/browse/NIFI-10895)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067438327


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java:
##########
@@ -36,6 +36,8 @@ public class C2NiFiProperties {
     public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout";
     public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout";
     public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout";
+    public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX + "rest.maxIdleConnections";

Review Comment:
   Thanks that resolves a ton of duplication. I was hoping that it is doable :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1065810905


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +120,30 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+    }
+
+    private static boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+        return !Optional.ofNullable(c2OperationAck)

Review Comment:
   This is equivalent to
   ```suggestion
           return Optional.ofNullable(c2OperationAck)
               .map(C2OperationAck::getOperationState)
               .map(C2OperationState::getState)
               .filter(FULLY_APPLIED::equals)
               .isPresent();
   ```



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+        doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.load());
+    }
+
+    private static OperationQueue getOperationQueue() {

Review Comment:
   No need to use static here



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +120,30 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+    }
+
+    private static boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {

Review Comment:
   No need to use static here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067441497


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+    private static final String VALID = "VALID";
+    private static final String EQUALS_SIGN = "=";
+    private static final String HASHMARK_SIGN = "#";
+
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final AgentPropertyValidationContext validationContext;
+    private final File bootstrapFile;
+    private final File bootstrapNewFile;
+
+    public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+        this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+        this.validationContext = new AgentPropertyValidationContext();
+        this.bootstrapFile = new File(bootstrapConfigFileLocation);
+        this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/" + BOOTSTRAP_UPDATED_FILE_NAME);
+    }
+
+    public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+        int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+        if (propertyCountToUpdate == 0) {
+            return false;
+        }
+        Set<String> propertiesToUpdateKeys = new HashSet<>(propertiesToUpdate.keySet());
+
+        Set<String> updatedProperties = new HashSet<>();
+        try (BufferedReader reader = new BufferedReader(new FileReader(bootstrapFile));
+            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bootstrapNewFile, false))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                for (String key : propertiesToUpdateKeys) {
+                    String prefix = key + EQUALS_SIGN;
+                    if (line.startsWith(prefix) || line.startsWith(HASHMARK_SIGN + prefix)) {
+                        line = prefix + propertiesToUpdate.get(key);
+                        updatedProperties.add(key);
+                    }
+                }
+                bufferedWriter.write(line + System.lineSeparator());
+            }
+
+            // add new properties which has no values before
+            propertiesToUpdateKeys.removeAll(updatedProperties);
+            for (String key : propertiesToUpdateKeys) {
+                bufferedWriter.write(key + EQUALS_SIGN + propertiesToUpdate.get(key) + System.lineSeparator());
+            }
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return true;
+    }
+
+    private int validateProperties(Map<String, String> propertiesToUpdate) {
+        Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+        Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
+        int propertyCountToUpdate = 0;
+        List<String> validationErrors = new ArrayList<>();
+        for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+            UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
+            if (updatableProperty == null) {
+                validationErrors.add(String.format("You can not update the {} property through C2 protocol", entry.getKey()));
+                continue;
+            }
+            if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
+                if (!getValidator(updatableProperty.getValidator())
+                    .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
+                    .map(ValidationResult::isValid)
+                    .orElse(true)) {
+                    validationErrors.add(String.format("Invalid value for %s", entry.getKey()));
+                    continue;
+                }
+                propertyCountToUpdate++;
+            }
+        }
+        if (!validationErrors.isEmpty()) {
+            throw new IllegalArgumentException("The following validation errors happened during property update:\\n" + validationErrors.stream().collect(Collectors.joining("\\n")));

Review Comment:
   I think this could be a simple 
   ```
   String.join("\\n", validationErrors)
   ```
   what do you think?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+    private static final String VALID = "VALID";
+    private static final String EQUALS_SIGN = "=";
+    private static final String HASHMARK_SIGN = "#";
+
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final AgentPropertyValidationContext validationContext;
+    private final File bootstrapFile;
+    private final File bootstrapNewFile;
+
+    public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+        this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+        this.validationContext = new AgentPropertyValidationContext();
+        this.bootstrapFile = new File(bootstrapConfigFileLocation);
+        this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/bootstrap-updated.conf");
+    }
+
+    public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+        int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+        if (propertyCountToUpdate == 0) {
+            return false;
+        }
+        Set<String> propertiesToUpdateKeys = new HashSet<>(propertiesToUpdate.keySet());
+
+        Set<String> updatedProperties = new HashSet<>();
+        try (BufferedReader reader = new BufferedReader(new FileReader(bootstrapFile));
+            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bootstrapNewFile, false))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                for (String key : propertiesToUpdateKeys) {
+                    String prefix = key + EQUALS_SIGN;
+                    if (line.startsWith(prefix) || line.startsWith(HASHMARK_SIGN + prefix)) {
+                        line = prefix + propertiesToUpdate.get(key);
+                        updatedProperties.add(key);
+                    }
+                }
+                bufferedWriter.write(line + System.lineSeparator());
+            }
+
+            // add new properties which has no values before
+            propertiesToUpdateKeys.removeAll(updatedProperties);
+            for (String key : propertiesToUpdateKeys) {
+                bufferedWriter.write(key + EQUALS_SIGN + propertiesToUpdate.get(key) + System.lineSeparator());
+            }
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return true;
+    }
+
+    private int validateProperties(Map<String, String> propertiesToUpdate) {
+        Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+        Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
+        int propertyCountToUpdate = 0;
+        for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+            UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
+            if (updatableProperty == null) {
+                throw new IllegalArgumentException("You can not update the requested property through C2 protocol");
+            }
+            if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
+                if (!getValidator(updatableProperty.getValidator())
+                    .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
+                    .map(ValidationResult::isValid)
+                    .orElse(true)) {
+                    throw new IllegalArgumentException(String.format("Invalid value for %s", entry.getKey()));

Review Comment:
   Thanks for the changes it will be really helpful if it comes to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067429017


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {

Review Comment:
   ok, works for me I just thought if it is a queue and we treat it as a queue and we use it as a queue we should use queue :) but you are right list works as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067434531


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +120,30 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+    }
+
+    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+        return Optional.ofNullable(c2OperationAck)
+            .map(C2OperationAck::getOperationState)
+            .map(C2OperationState::getState)
+            .filter(FULLY_APPLIED::equals)
+            .isPresent();
+    }
+
+    private boolean initiateRestart(LinkedList<C2Operation> lRequestedOperations, C2Operation requestedOperation) {
+        try {
+            disableHeartbeat();
+            requestedOperationDAO.save(new OperationQueue(requestedOperation, lRequestedOperations));

Review Comment:
   Can we rename the variable similarly to earlier? Just lose the magic "l" prefix :)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties bootstrapProperties, String confDir, Proc
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties);
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+        DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file");
+        if (!swapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
+    private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties());
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        if (!bootstrapSwapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");

Review Comment:
   ok, thanks



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.Optional;
+
+/**
+ * The purpose of this interface is to be able to persist operations between MiNiFi restarts.

Review Comment:
   I know the 2 things are bound but if we can let's not reference MiNiFi in the c2 upper level module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067426303


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);

Review Comment:
   Ok thanks it is unfortunate that we can't get rid of it fully :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067432784


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java:
##########
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
      */
     Map<String, Object> getProperties();
 
+    /**
+     * Determines if the given operation requires to restart the MiNiFi process
+     * @return true if it requires restart, false otherwise
+     */
+    default boolean requiresRestart() {

Review Comment:
   Yeah so by client I mean any implementation of the c2 functionality that is not the NiFi one. As you stated this current implementation while we try to keep it "clean" makes assumptions about how NiFi works. Anyway this is the only implementation / usage at the moment so we can surely live with it just wanted to discuss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
ferencerdei commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067823906


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +120,30 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+    }
+
+    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+        return Optional.ofNullable(c2OperationAck)
+            .map(C2OperationAck::getOperationState)
+            .map(C2OperationState::getState)
+            .filter(FULLY_APPLIED::equals)
+            .isPresent();
+    }
+
+    private boolean initiateRestart(LinkedList<C2Operation> lRequestedOperations, C2Operation requestedOperation) {
+        try {
+            disableHeartbeat();
+            requestedOperationDAO.save(new OperationQueue(requestedOperation, lRequestedOperations));

Review Comment:
   sure, I missed this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1061435071


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));

Review Comment:
   We shouldn't serialize c2Operation and pass it to sendCommand, as it's not used anymore.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)
+                .map(C2OperationAck::getOperationState)
+                .map(C2OperationState::getState)
+                .filter(state -> !FULLY_APPLIED.equals(state))
+                .isPresent();
+    }
+
+    private Optional<C2OperationAck> handleOperation(C2OperationHandler c2OperationHandler, C2Operation requestedOperation, List<C2Operation> requestedOperations) {
+        C2OperationAck c2OperationAck = c2OperationHandler.handle(requestedOperation);
+        if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+            handleRestartableOperation(requestedOperations, requestedOperation);
+            return Optional.empty();
+        } else {
+            return Optional.of(c2OperationAck);
+        }
+    }
+
+    private void handleRestartableOperation(List<C2Operation> remainingOperations, C2Operation requestedOperation) {
+        // need to stop heartbeats till the restart happens
+        heartbeatLocked = true;

Review Comment:
   We should introduce a disableHeartbeat method so it would be a consistent notation
   (hearbeatLocked = false; is enableHeartbeat everywhere)



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {

Review Comment:
   Can we make a log here that this means that bootstrap did not restart the process as the bootstrap side oepration has failed?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {

Review Comment:
   Do we have any safety valve here to prevent bootstrap to accidentally restart MiNiFi even after the timeout? 



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -109,17 +131,25 @@ public C2NifiClientService(final NiFiProperties niFiProperties, final FlowContro
         OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
         TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
         UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory());
+        objectMapper = new ObjectMapper();
         updateAssetCommandHelper.createAssetDirectory();
-        C2OperationService c2OperationService = new C2OperationService(Arrays.asList(
+        this.bootstrapCommunicator = bootstrapCommunicator;
+        requestedOperationDAO = new FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), objectMapper);
+        String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file");
+        updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+        propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
+        C2OperationHandlerProvider c2OperationHandlerProvider = new C2OperationHandlerProvider(Arrays.asList(
             new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider),
             new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
             TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider,
                 transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
             UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
-                updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction)
+                updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction),
+            new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties)
         ));
-        this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationService);
-        this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers());
+        this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
+        this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
+        bootstrapCommunicator.registerMessageHandler("ACKNOWLEDGE_OPERATION", (params, output) -> acknowledgeHandler(params));

Review Comment:
   Can we move "ACKNOWLEDGE_OPERATION" to constant please?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");

Review Comment:
   ```suggestion
               String command = Optional.ofNullable(c2Operation.getOperand())
                   .map(operand -> c2Operation.getOperation().name() + "_" + operand.name())
                   .orElse(c2Operation.getOperation().name());
   ```
   But it's a matter of taste :)



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)
+                .map(C2OperationAck::getOperationState)
+                .map(C2OperationState::getState)
+                .filter(state -> !FULLY_APPLIED.equals(state))
+                .isPresent();
+    }
+
+    private Optional<C2OperationAck> handleOperation(C2OperationHandler c2OperationHandler, C2Operation requestedOperation, List<C2Operation> requestedOperations) {
+        C2OperationAck c2OperationAck = c2OperationHandler.handle(requestedOperation);
+        if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+            handleRestartableOperation(requestedOperations, requestedOperation);
+            return Optional.empty();
+        } else {
+            return Optional.of(c2OperationAck);
+        }
+    }
+
+    private void handleRestartableOperation(List<C2Operation> remainingOperations, C2Operation requestedOperation) {
+        // need to stop heartbeats till the restart happens
+        heartbeatLocked = true;
+
+        try {
+            requestedOperationDAO.save(new OperationQueue(requestedOperation, remainingOperations));
+            c2OperationRegister.accept(requestedOperation);
+        } catch (Exception e) {
+            if (remainingOperations.isEmpty()) {
+                enableHeartbeat();
+            } else {
+                // reset saved queue and continue with remaining operations
+                requestedOperationDAO.reset();
+                handleRequestedOperations(remainingOperations);

Review Comment:
   Maybe I'm missing something but recursive method calling is unnecessary here. By  handling the error in the caller method within the loop we could achieve the same result without the need of recursion, and this part would become more comprehensible (also applied some refactors to flatten the code)
   ```suggestion
       public void handleRequestedOperations(List<C2Operation> requestedOperations) {
           LinkedList<C2Operation> c2Operations = new LinkedList<>(requestedOperations);
           C2Operation requestedOperation;
           while ((requestedOperation = c2Operations.poll()) != null) {
               Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);
               if (!c2OperationHandler.isPresent()) {
                   logger.warn("No handler found for {} {} operation", requestedOperation.getOperation(), requestedOperation.getOperand());
                   continue;
               }
               C2OperationHandler operationHandler = c2OperationHandler.get();
               C2OperationAck c2OperationAck = operationHandler.handle(requestedOperation);
               if (requiresRestart(operationHandler, c2OperationAck) && initiateRestart(c2Operations, requestedOperation)) {
                   return;
               }
               sendAcknowledge(c2OperationAck);
           }
           enableHeartbeat();
           requestedOperationDAO.reset();
       }
   
       private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
           return c2OperationHandler.requiresRestart()
               && !Optional.ofNullable(c2OperationAck)
                   .map(C2OperationAck::getOperationState)
                   .map(C2OperationState::getState)
                   .filter(state -> !FULLY_APPLIED.equals(state))
                   .isPresent();
       }
   
       private boolean initiateRestart(LinkedList<C2Operation> lRequestedOperations, C2Operation requestedOperation) {
           try {
               // need to stop heartbeats till the restart happens
               heartbeatLocked = true;
               requestedOperationDAO.save(new OperationQueue(requestedOperation, lRequestedOperations));
               c2OperationRegister.accept(requestedOperation);
               return true;
           } catch (Exception e) {
               logger.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
               enableHeartbeat();
               // reset saved queue and continue with remaining operations
               requestedOperationDAO.reset();
           }
           return false;
       }
   ```



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);

Review Comment:
   We should reduce the scope of the try catch block to the else block only.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;

Review Comment:
   We should rename the member according to it's type



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();

Review Comment:
   We could change order here, and pass the command string to registerAcknowledgeTimeoutTask and log it there in case of timeout. That could help debugging as we could see which command's ack has timed out without scrolling the logs back.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;

Review Comment:
   This could be a constant or should be defined outside the loop



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);

Review Comment:
   Can we add that "Operation requiring restart is failed, and no restart is happened"?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {

Review Comment:
   At this point if we can't load the operationQueue it should be considered an error right? Should we log it at least?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {
+                handleOngoingOperations(optionalOperationQueue);

Review Comment:
   As second thought state != OperationState.FULLY_APPLIED can also mean that the restart happened but Minifi was unable to start and bootstrap had to revert the previous state, right?
   In this case can we distinguish between the states?
   We only need to call handleOngoingOperations when bootstrap did not restart minifi.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }

Review Comment:
   We should log the ack has been successfully received



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.Optional;
+
+public interface RequestedOperationDAO {
+
+    /**
+     * Persist the given requested operation list
+     * @param operationQueue the queue containing the current and remaining operations
+     */
+    void save(OperationQueue operationQueue);
+
+    /**
+     * Returns the saved Operations
+     *
+     * @return the C2 Operations queue with the actual operation
+     */
+    Optional<OperationQueue> get();

Review Comment:
   Minor but maybe we should call this load() the other direction of save



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {

Review Comment:
   I think we can get rid of the synchronized block if call this method from acknowledgeHandler only if a restart has not happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
ferencerdei commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1062251850


##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+    protected static final String ID = "id";

Review Comment:
   no reason, changed them.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)

Review Comment:
   isEmpty is added only in java11, I'm not sure if we are ready to use it, but I moved it to a separate method.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java:
##########
@@ -36,6 +36,8 @@ public class C2NiFiProperties {
     public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout";
     public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout";
     public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout";
+    public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX + "rest.maxIdleConnections";

Review Comment:
   Merged them.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {

Review Comment:
   This method was also added because of unexpected communication issues between bootstrap and the minifi process. I'm not sure if it's needed to apply more defensive programming here (but I'm open if you have any idea).



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {
+                handleOngoingOperations(optionalOperationQueue);

Review Comment:
   I can add more state to MiNiFiCommandState so we can handle these differences.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java:
##########
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
      */
     Map<String, Object> getProperties();
 
+    /**
+     * Determines if the given operation requires to restart the MiNiFi process
+     * @return true if it requires restart, false otherwise
+     */
+    default boolean requiresRestart() {

Review Comment:
   By client side what do you mean? It really depends on what the given handler modifies. Eg.: if a configuration file is modified it can't be applied without restarting. It also requires some internal Nifi knowledge about how bootstrapping works. For me, this was the most straightforward way to handle this, so if we add new operations we don't need to modify other parts of the code.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String UPDATED_CONFIG_FILE_NAME = "config-updated.yml";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.debug("Handling configuration update");
+        Optional<MiNiFiCommandState> commandState = Optional.empty();
+        try (FileInputStream configFile = new FileInputStream(getConfigFilePath().toFile())) {
+            ByteBuffer readOnlyNewConfig = ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+                    IOUtils.toByteArray(configFile), runMiNiFi.getConfigFileReference().get().duplicate(), bootstrapFileProvider.getBootstrapProperties());
+            if (differentiator.isNew(readOnlyNewConfig)) {
+                miNiFiConfigurationChangeListener.handleChange(new ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+            } else {
+                logger.info("The given configuration does not contain any update. No operation required");
+                commandState = Optional.of(MiNiFiCommandState.NO_OPERATION);
+            }
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+            logger.error("Could not handle configuration update", e);
+        }
+        return commandState;
+    }
+
+    private Path getConfigFilePath() {
+        return ofNullable(safeGetPropertiesFilePath())
+            .map(File::new)
+            .map(File::getParent)
+            .map(parentDir -> new File(parentDir + UPDATED_CONFIG_FILE_NAME))
+            .orElse(new File("./conf/" + UPDATED_CONFIG_FILE_NAME)).toPath();

Review Comment:
   It's just a fallback here, I've extracted it to a static variable.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {

Review Comment:
   I think priorityque would not add any value here. LinkedList also implements the queue methods, and we use only that. Therefore I would keep this as it is if you don't mind.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {

Review Comment:
   added a comment



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);
+            if (c2OperationHandler.isPresent()) {
+                Optional<C2OperationAck> c2OperationAck = handleOperation(c2OperationHandler.get(), requestedOperation, lRequestedOperations);
+                if (c2OperationAck.isPresent()) {
+                    sendAcknowledge(c2OperationAck.get());
+                } else {
+                    return;

Review Comment:
   Added a comment



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties bootstrapProperties, String confDir, Proc
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties);
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+        DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file");
+        if (!swapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
+    private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties());
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        if (!bootstrapSwapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");

Review Comment:
   added the swap file types into the logs. The 2 methods revertFlowConfig and revertBootstrapConfig are slightly different so extracting couply lines into common method would not make it cleaner in my opinion.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+        doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.get());
+    }
+

Review Comment:
   I would keep this approach, your suggestion would mean that we use the real implementation of the objectmapper what I would avoid.. 



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);

Review Comment:
   Moved the logging, but need to keep the isPresent because of the "return"



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }

Review Comment:
   It's logged in the acknowledge handler so not needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba closed pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba closed pull request #6733: NIFI-10895 Update properties command for MiNiFi C2
URL: https://github.com/apache/nifi/pull/6733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
ferencerdei commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067820986


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private C2Operation currentOperation;
+    private List<C2Operation> remainingOperations;
+
+    public OperationQueue() {
+    }
+
+    public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) {
+        this.currentOperation = currentOperation;
+        this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations;
+    }
+
+    public C2Operation getCurrentOperation() {
+        return currentOperation;
+    }
+
+    public List<C2Operation> getRemainingOperations() {
+        return remainingOperations;
+    }
+
+    private void setCurrentOperation(C2Operation currentOperation) {

Review Comment:
   Sorry, incorrect idea settings and I missed it. Changed to public



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067447139


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private C2Operation currentOperation;
+    private List<C2Operation> remainingOperations;
+
+    public OperationQueue() {
+    }
+
+    public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) {
+        this.currentOperation = currentOperation;
+        this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations;
+    }
+
+    public C2Operation getCurrentOperation() {
+        return currentOperation;
+    }
+
+    public List<C2Operation> getRemainingOperations() {
+        return remainingOperations;
+    }
+
+    private void setCurrentOperation(C2Operation currentOperation) {

Review Comment:
   Why are these private setters needed? For serialisation? I mean they are private so I don't think they are helpful there?  I think the constructor takes care of that but maybe I'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809530


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {
+                handleOngoingOperations(optionalOperationQueue);

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1061405522


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");

Review Comment:
   Maybe we could be less specific we don't know if this is restart. We could say "heartbeats are locked skipping sending for now" or something along these lines. What do you think?



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)

Review Comment:
   This could go te a dedicated function either here or on the C2OperationAck class, also you could lose the negation if you call isEmpty in the end.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);
+            if (c2OperationHandler.isPresent()) {
+                Optional<C2OperationAck> c2OperationAck = handleOperation(c2OperationHandler.get(), requestedOperation, lRequestedOperations);
+                if (c2OperationAck.isPresent()) {
+                    sendAcknowledge(c2OperationAck.get());
+                } else {
+                    return;

Review Comment:
   Can you add a little comment here or extract this limited logic to a well named function? Simply reading the code is not evident why we return in this case.



##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java:
##########
@@ -128,16 +148,110 @@ void testSendHeartbeatNotAckWhenOperationAckMissing() {
         C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
         hbResponse.setRequestedOperations(generateOperation(1));
         when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+        when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
 
         c2ClientService.sendHeartbeat(runtimeInfoWrapper);
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService).handleOperation(any());
         verify(client, times(0)).acknowledgeOperation(any());
     }
 
+    @Test
+    void shouldHeartbeatSendingNotPropagateExceptions() {
+        when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new RuntimeException());
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+    }
+
+    @Test
+    void shouldAckSendingNotPropagateExceptions() {
+        C2OperationAck operationAck = mock(C2OperationAck.class);
+        doThrow(new RuntimeException()).when(client).acknowledgeOperation(operationAck);
+
+        c2ClientService.sendAcknowledge(operationAck);
+    }
+
+    @Test
+    void shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState() {

Review Comment:
   It is a long name (or a short book :)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties bootstrapProperties, String confDir, Proc
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties);
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+        DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file");
+        if (!swapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
+    private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties());
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        if (!bootstrapSwapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");

Review Comment:
   Don't we need to say which swap file? Maybe Logging the path as well would be helpful. All of this feels like duplication but I understand why you didn't abstract but half f this (and the previous) function should be easily extracted right?
   
   The delete swap file function from below can't be applied hetre?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java:
##########
@@ -63,45 +67,63 @@ public void communicate() throws IOException {
         }
 
         try {
-            processRequest(cmd, args);
+            processRequest(cmd, args, writer);
         } catch (InvalidCommandException exception) {
             throw new IOException("Received invalid command from MiNiFi: " + line, exception);
         }
     }
 
-    private void processRequest(String cmd, String[] args) throws InvalidCommandException, IOException {
+    private void processRequest(String cmd, String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
         switch (cmd) {
             case "PORT":
-                handlePortCommand(args);
+                handlePortCommand(args, writer);
                 break;
             case "STARTED":
-                handleStartedCommand(args);
+                handleStartedCommand(args, writer);
                 break;
             case "SHUTDOWN":
-                handleShutDownCommand();
+                handleShutDownCommand(writer);
                 break;
             case "RELOAD":
-                handleReloadCommand();
+                handleReloadCommand(writer);
+                break;
+            case "UPDATE_PROPERTIES":
+                handlePropertiesUpdateCommand(args, writer);
+                break;
+            case "UPDATE_CONFIGURATION":
+                handleUpdateConfigurationCommand(args, writer);
                 break;
             default:
                 throw new InvalidCommandException("Unknown command: " + cmd);
         }
     }
 
-    private void handleReloadCommand() throws IOException {
+    private void handleUpdateConfigurationCommand(String[] args, BufferedWriter writer) throws IOException {

Review Comment:
   args is not used here and in the other function right?



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java:
##########
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
      */
     Map<String, Object> getProperties();
 
+    /**
+     * Determines if the given operation requires to restart the MiNiFi process
+     * @return true if it requires restart, false otherwise
+     */
+    default boolean requiresRestart() {

Review Comment:
   Haven't really thought this through just a quick question. Doesn't this depend somewhat on the client implementation? With the current one it makes sense of course.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)
+                .map(C2OperationAck::getOperationState)
+                .map(C2OperationState::getState)
+                .filter(state -> !FULLY_APPLIED.equals(state))
+                .isPresent();
+    }
+
+    private Optional<C2OperationAck> handleOperation(C2OperationHandler c2OperationHandler, C2Operation requestedOperation, List<C2Operation> requestedOperations) {
+        C2OperationAck c2OperationAck = c2OperationHandler.handle(requestedOperation);
+        if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+            handleRestartableOperation(requestedOperations, requestedOperation);
+            return Optional.empty();
+        } else {
+            return Optional.of(c2OperationAck);
+        }
+    }
+
+    private void handleRestartableOperation(List<C2Operation> remainingOperations, C2Operation requestedOperation) {
+        // need to stop heartbeats till the restart happens
+        heartbeatLocked = true;
+
+        try {
+            requestedOperationDAO.save(new OperationQueue(requestedOperation, remainingOperations));
+            c2OperationRegister.accept(requestedOperation);
+        } catch (Exception e) {
+            if (remainingOperations.isEmpty()) {
+                enableHeartbeat();
+            } else {
+                // reset saved queue and continue with remaining operations
+                requestedOperationDAO.reset();

Review Comment:
   maybe cleanUp would be a more descriptive name instead of reset? 



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {

Review Comment:
   Wouldn't it be more "natural" to implement this with a PriorityQueue? It would help readability and would clean slightly this function as well (I think).



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)
+                .map(C2OperationAck::getOperationState)
+                .map(C2OperationState::getState)
+                .filter(state -> !FULLY_APPLIED.equals(state))
+                .isPresent();
+    }
+
+    private Optional<C2OperationAck> handleOperation(C2OperationHandler c2OperationHandler, C2Operation requestedOperation, List<C2Operation> requestedOperations) {
+        C2OperationAck c2OperationAck = c2OperationHandler.handle(requestedOperation);
+        if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+            handleRestartableOperation(requestedOperations, requestedOperation);
+            return Optional.empty();
+        } else {
+            return Optional.of(c2OperationAck);
+        }
+    }
+
+    private void handleRestartableOperation(List<C2Operation> remainingOperations, C2Operation requestedOperation) {
+        // need to stop heartbeats till the restart happens
+        heartbeatLocked = true;

Review Comment:
   We have enableHeartbeat function can we have a disableHeartbeat as well? With that I don't think you need the comment here with function name it is quite self explanatory.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation);

Review Comment:
   You could do the warning logging here on the service side in the get function I think it belongs there, that way you could lose the isPresent here and make the whole thing more fluent. What do you think?



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.Optional;
+
+public interface RequestedOperationDAO {

Review Comment:
   Maybe a class level comment / doc could be added to outline the purpose of the interface.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java:
##########
@@ -36,6 +36,8 @@ public class C2NiFiProperties {
     public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout";
     public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout";
     public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout";
+    public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX + "rest.maxIdleConnections";

Review Comment:
   This all feels somewhat duplicated can we utilise MiNiFiProperties here?



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java:
##########
@@ -22,32 +22,19 @@
 import java.util.Map;
 import java.util.Optional;
 import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
 import org.apache.nifi.c2.protocol.api.OperandType;
 import org.apache.nifi.c2.protocol.api.OperationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class C2OperationService {
-
-    private static final Logger logger = LoggerFactory.getLogger(C2OperationService.class);
+public class C2OperationHandlerProvider {
 
     private final Map<OperationType, Map<OperandType, C2OperationHandler>> handlerMap = new HashMap<>();
 
-    public C2OperationService(List<C2OperationHandler> handlers) {
+    public C2OperationHandlerProvider(List<C2OperationHandler> handlers) {
         for (C2OperationHandler handler : handlers) {
             handlerMap.computeIfAbsent(handler.getOperationType(), x -> new HashMap<>()).put(handler.getOperandType(), handler);
         }
     }
 
-    public Optional<C2OperationAck> handleOperation(C2Operation operation) {

Review Comment:
   Thanks, the class looks cleaner and serves a concrete purpose this way.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations);

Review Comment:
   I'm not sure whether the "l" prefix is intentional in the variable name (and stands for local?) or not but I would vote against it :) You could rename the parameter to something longer as it is used only once and simply use requestedOperations inside the function.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+        doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.get());
+    }
+

Review Comment:
   I know it is not fully unit test like but what about just calling write and get on the dao and asserting the get result against what was passed to write?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ENABLE;
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PropertiesPersisterTest {

Review Comment:
   Nice!



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private C2Operation currentOperation;
+    private List<C2Operation> remainingOperations;

Review Comment:
   As commented elsewhere I would use an actual queue instead of list.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesOperationHandler implements C2OperationHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class);
+
+    private final OperandPropertiesProvider operandPropertiesProvider;
+    private final Function<Map<String, String>, Boolean> persistProperties;
+
+    public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function<Map<String, String>, Boolean> persistProperties) {
+        this.operandPropertiesProvider = operandPropertiesProvider;
+        this.persistProperties = persistProperties;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return UPDATE;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return operandPropertiesProvider.getProperties();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        c2OperationAck.setOperationId(operation.getIdentifier());
+        C2OperationState operationState = new C2OperationState();
+        c2OperationAck.setOperationState(operationState);
+        try {
+            if (persistProperties.apply(operation.getArgs())) {
+                operationState.setState(FULLY_APPLIED);
+            } else {
+                LOGGER.info("Properties are already in desired state");
+                operationState.setState(NO_OPERATION);
+            }
+        } catch (IllegalArgumentException e) {
+            LOGGER.error(e.getMessage());
+            operationState.setState(NOT_APPLIED);
+            operationState.setDetails(e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Exception happened during persisting properties");

Review Comment:
   Can we log the exception as well, this way it doesn't really help the investigation.



##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+    protected static final String ID = "id";

Review Comment:
   Why are these protected?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties bootstrapProperties, String confDir, Proc
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");

Review Comment:
   Maybe we could also add here the extra info of Flow swap file exists (similarly to the bootstrap message)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiCommandState.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+public enum MiNiFiCommandState {

Review Comment:
   Can you add class level doc explaining their purpose (just to be sure it is not confused with the C2 Operation states)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java:
##########
@@ -63,45 +67,63 @@ public void communicate() throws IOException {
         }
 
         try {
-            processRequest(cmd, args);
+            processRequest(cmd, args, writer);
         } catch (InvalidCommandException exception) {
             throw new IOException("Received invalid command from MiNiFi: " + line, exception);
         }
     }
 
-    private void processRequest(String cmd, String[] args) throws InvalidCommandException, IOException {
+    private void processRequest(String cmd, String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
         switch (cmd) {
             case "PORT":
-                handlePortCommand(args);
+                handlePortCommand(args, writer);
                 break;
             case "STARTED":
-                handleStartedCommand(args);
+                handleStartedCommand(args, writer);
                 break;
             case "SHUTDOWN":
-                handleShutDownCommand();
+                handleShutDownCommand(writer);
                 break;
             case "RELOAD":
-                handleReloadCommand();
+                handleReloadCommand(writer);
+                break;
+            case "UPDATE_PROPERTIES":

Review Comment:
   What do you think about an Enum for these?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String UPDATED_CONFIG_FILE_NAME = "config-updated.yml";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.debug("Handling configuration update");

Review Comment:
   I think it is quite rare and important could be logged on info level to show what is happening.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String UPDATED_CONFIG_FILE_NAME = "config-updated.yml";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.debug("Handling configuration update");
+        Optional<MiNiFiCommandState> commandState = Optional.empty();

Review Comment:
   commandState doesn't need to be optional you can set it to null here and at return you could do Optional.ofNullable.



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java:
##########
@@ -172,13 +186,42 @@ void testCommunicateShouldHandleShutdownCommand() throws IOException {
     void testCommunicateShouldHandleReloadCommand() throws IOException {
         InputStream inputStream = new ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
 
-        PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
-        when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
 
-        bootstrapCodec.communicate();
+        bootstrapCodec.communicate(inputStream, outputStream);
+
+        assertEquals(OK, outputStream.toString().trim());
+    }
+
+    @Test
+    void shouldHandleUpdateConfigurationCommand() throws IOException {

Review Comment:
   While I really like the "should" prefixed test naming for the sake of consistency we should follow the convention of the already existing tests.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String UPDATED_CONFIG_FILE_NAME = "config-updated.yml";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.debug("Handling configuration update");
+        Optional<MiNiFiCommandState> commandState = Optional.empty();
+        try (FileInputStream configFile = new FileInputStream(getConfigFilePath().toFile())) {
+            ByteBuffer readOnlyNewConfig = ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+                    IOUtils.toByteArray(configFile), runMiNiFi.getConfigFileReference().get().duplicate(), bootstrapFileProvider.getBootstrapProperties());
+            if (differentiator.isNew(readOnlyNewConfig)) {
+                miNiFiConfigurationChangeListener.handleChange(new ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+            } else {
+                logger.info("The given configuration does not contain any update. No operation required");
+                commandState = Optional.of(MiNiFiCommandState.NO_OPERATION);
+            }
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+            logger.error("Could not handle configuration update", e);
+        }
+        return commandState;
+    }
+
+    private Path getConfigFilePath() {
+        return ofNullable(safeGetPropertiesFilePath())
+            .map(File::new)
+            .map(File::getParent)
+            .map(parentDir -> new File(parentDir + UPDATED_CONFIG_FILE_NAME))
+            .orElse(new File("./conf/" + UPDATED_CONFIG_FILE_NAME)).toPath();

Review Comment:
   Isn't /conf available as a constant somewhere so we wouldn't need to hard code it this deeply.



##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+    protected static final String ID = "id";
+    protected static final Map<String, String> ARGS = Collections.singletonMap("key", "value");
+    @Mock
+    private OperandPropertiesProvider operandPropertiesProvider;
+
+    @Mock
+    private Function<Map<String, String>, Boolean> persistProperties;
+
+    @InjectMocks
+    private UpdatePropertiesOperationHandler updatePropertiesOperationHandler;
+
+    @Test
+    void shouldReturnStaticSettings() {
+        assertEquals(UPDATE, updatePropertiesOperationHandler.getOperationType());
+        assertEquals(PROPERTIES, updatePropertiesOperationHandler.getOperandType());
+        assertTrue(updatePropertiesOperationHandler.requiresRestart());
+    }
+
+    @Test
+    void shouldReturnProperties() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("test", new Object());
+        when(operandPropertiesProvider.getProperties()).thenReturn(properties);
+
+        Map<String, Object> result = updatePropertiesOperationHandler.getProperties();
+
+        assertEquals(properties, result);
+    }
+
+    @Test
+    void shouldReturnAckWithFullyAppliedWhenPersistIsSuccessful() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenReturn(true);
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.FULLY_APPLIED), result);
+    }
+
+    @Test
+    void shouldReturnAckWithNoOperationWhenPersistReturnFalse() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenReturn(false);
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.NO_OPERATION), result);
+    }
+
+    @Test
+    void shouldReturnNotAppliedInCaseOfIllegalArgumentException() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenThrow(new IllegalArgumentException());
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.NOT_APPLIED), result);
+    }
+
+    @Test
+    void shouldReturnNotAppliedInCaseOfException() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenThrow(new RuntimeException());
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        C2OperationAck expected = getExpected(OperationState.NOT_APPLIED);
+        expected.getOperationState().setDetails("Failed to persist properties");
+        assertEquals(expected, result);
+    }
+    private C2OperationAck getExpected(OperationState operationState) {

Review Comment:
   An empty line would be nice.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.slf4j.Logger;
+
+public class UpdatePropertiesService {
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdatePropertiesService(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        Optional<MiNiFiCommandState> commandState;
+        try {
+            File bootstrapConfigFile = BootstrapFileProvider.getBootstrapConfFile();
+
+            File bootstrapSwapConfigFile = bootstrapFileProvider.getBootstrapConfSwapFile();
+            logger.info("Persisting old bootstrap configuration to {}", bootstrapSwapConfigFile.getAbsolutePath());
+
+            try (FileInputStream configFileInputStream = new FileInputStream(bootstrapConfigFile)) {
+                Files.copy(configFileInputStream, bootstrapSwapConfigFile.toPath(), REPLACE_EXISTING);
+            }
+
+            Files.copy(bootstrapFileProvider.getBootstrapConfNewFile().toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+
+            // already from new
+            commandState = generateConfigfilesBasedOnNewProperties(bootstrapConfigFile, bootstrapSwapConfigFile, bootstrapFileProvider.getBootstrapProperties());
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+            logger.error("Failed to load new bootstrap properties", e);
+        }
+        return commandState;
+    }
+
+    private Optional<MiNiFiCommandState> generateConfigfilesBasedOnNewProperties(File bootstrapConfigFile, File bootstrapSwapConfigFile, Properties bootstrapProperties)
+        throws IOException, ConfigurationChangeException {
+        Optional<MiNiFiCommandState> commandState = Optional.empty();
+        try {
+            ByteBuffer byteBuffer = generateConfigFiles(asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()),
+                bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+            runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+            restartInstance();
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+            // reverting config file
+            try (FileInputStream swapConfigFileStream = new FileInputStream(bootstrapSwapConfigFile)) {
+                Files.copy(swapConfigFileStream, bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+            }
+            // read reverted properties
+            bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+            ByteBuffer byteBuffer = generateConfigFiles(
+                asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()), bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+            runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+
+            logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+            if (!bootstrapSwapConfigFile.delete()) {
+                logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");

Review Comment:
   We could log the swap file location to be deleted as well.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+    protected static final String REQUESTED_OPERATIONS_FILE_NAME = "requestedOperations.data";
+
+    private final ObjectMapper objectMapper;
+    private final File requestedOperationsFile;
+
+    public FileBasedRequestedOperationDAO(String runDir, ObjectMapper objectMapper) {
+        this.requestedOperationsFile = new File(runDir, REQUESTED_OPERATIONS_FILE_NAME);
+        this.objectMapper = objectMapper;
+    }
+
+    public void save(OperationQueue operationQueue) {
+        LOGGER.info("Saving C2 operations to file");
+        LOGGER.debug("C2 Operation Queue: {}", operationQueue);
+        try {
+            objectMapper.writeValue(requestedOperationsFile, operationQueue);
+        } catch (Exception e) {
+            LOGGER.error("Failed to save requested c2 operations", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Optional<OperationQueue> get() {
+        LOGGER.info("Reading queued c2 operations from file");
+        if (requestedOperationsFile.exists()) {
+            try {
+                OperationQueue operationQueue = objectMapper.readValue(requestedOperationsFile, OperationQueue.class);
+                LOGGER.debug("Queued operations: {}", operationQueue);
+                return Optional.of(operationQueue);
+            } catch (Exception e) {
+                LOGGER.error("Failed to read queued operations file", e);
+            }
+        } else {
+            LOGGER.info("There is no queued c2 operation");
+        }
+        return Optional.empty();
+    }
+
+    public void reset() {
+        if(requestedOperationsFile.exists() && !requestedOperationsFile.delete()) {
+            LOGGER.error("Failed to delete requested operations file");

Review Comment:
   We could also log the location of the file and also say that it should be deleted manually or later operations potentially will be lost as it won't be persisted. Regardless some messaging with urgency would be needed here I think.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;

Review Comment:
   We could extract this to class level or at least outside of the loop :) Maybe it could be renamed to sleepIncrement or similar.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+    private static final String VALID = "VALID";
+    private static final String EQUALS_SIGN = "=";
+    private static final String HASHMARK_SIGN = "#";
+
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final AgentPropertyValidationContext validationContext;
+    private final File bootstrapFile;
+    private final File bootstrapNewFile;
+
+    public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+        this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+        this.validationContext = new AgentPropertyValidationContext();
+        this.bootstrapFile = new File(bootstrapConfigFileLocation);
+        this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/bootstrap-updated.conf");

Review Comment:
   We will need a shared constant to reference "bootstrap-updated.conf"



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+    private static final String VALID = "VALID";
+    private static final String EQUALS_SIGN = "=";
+    private static final String HASHMARK_SIGN = "#";
+
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final AgentPropertyValidationContext validationContext;
+    private final File bootstrapFile;
+    private final File bootstrapNewFile;
+
+    public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+        this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+        this.validationContext = new AgentPropertyValidationContext();
+        this.bootstrapFile = new File(bootstrapConfigFileLocation);
+        this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/bootstrap-updated.conf");
+    }
+
+    public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+        int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+        if (propertyCountToUpdate == 0) {
+            return false;
+        }
+        Set<String> propertiesToUpdateKeys = new HashSet<>(propertiesToUpdate.keySet());
+
+        Set<String> updatedProperties = new HashSet<>();
+        try (BufferedReader reader = new BufferedReader(new FileReader(bootstrapFile));
+            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bootstrapNewFile, false))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                for (String key : propertiesToUpdateKeys) {
+                    String prefix = key + EQUALS_SIGN;
+                    if (line.startsWith(prefix) || line.startsWith(HASHMARK_SIGN + prefix)) {
+                        line = prefix + propertiesToUpdate.get(key);
+                        updatedProperties.add(key);
+                    }
+                }
+                bufferedWriter.write(line + System.lineSeparator());
+            }
+
+            // add new properties which has no values before
+            propertiesToUpdateKeys.removeAll(updatedProperties);
+            for (String key : propertiesToUpdateKeys) {
+                bufferedWriter.write(key + EQUALS_SIGN + propertiesToUpdate.get(key) + System.lineSeparator());
+            }
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return true;
+    }
+
+    private int validateProperties(Map<String, String> propertiesToUpdate) {
+        Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+        Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
+        int propertyCountToUpdate = 0;
+        for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+            UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
+            if (updatableProperty == null) {
+                throw new IllegalArgumentException("You can not update the requested property through C2 protocol");
+            }
+            if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
+                if (!getValidator(updatableProperty.getValidator())
+                    .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
+                    .map(ValidationResult::isValid)
+                    .orElse(true)) {
+                    throw new IllegalArgumentException(String.format("Invalid value for %s", entry.getKey()));

Review Comment:
   With this fail fast approach there could be multiple retries while everything is sorted out what do you think about collecting all the issues and throwing exception only at the ned with all the relevant information?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesPropertyProvider implements OperandPropertiesProvider {

Review Comment:
   nice name :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809117


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {

Review Comment:
   This approach solves the other direction, and prevents to stuck in a forever waiting loop.
   The other direction is: ack wait loop times out and MiNiFi continues the process remainder operations. Meanwhile bootstrap acks back and starts processing the same list of operations. Even the previously omitted synchronized keyword wouldn't prevent this, just would delay the failure.
   This is highly unlikely to happen, but if this happens it will result in a very cryptic behavior. Maybe we should create another state variable `isAckTimedOut` besides `ackReceived` and fill in respectively. So in the `acknowledgeHandler` method we would check it's value, and if the operation is timed out we would simply log that the acknowledge has arrived, but we dropped it as it had already been timed out.
   Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #6733: NIFI-10895 Update properties command for MiNiFi C2

Posted by GitBox <gi...@apache.org>.
bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067444143


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+        doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.get());
+    }
+

Review Comment:
   Maybe it is a matter of taste. I know his way it is "by the book" unit test but that way (with an actual objectmapper) it is simply a more cleaner and more readable (a better test in my mind). It gets the job done this way as well so if you prefer it I'm ok with leaving it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org