You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "briansolo1985 (via GitHub)" <gi...@apache.org> on 2023/06/06 10:54:21 UTC

[GitHub] [nifi] briansolo1985 opened a new pull request, #7344: NIFI-11514 Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

briansolo1985 opened a new pull request, #7344:
URL: https://github.com/apache/nifi/pull/7344

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-11514](https://issues.apache.org/jira/browse/NIFI-11514)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [x] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "ferencerdei (via GitHub)" <gi...@apache.org>.
ferencerdei commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1263350692


##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java:
##########
@@ -93,25 +79,47 @@ public enum MiNiFiProperties {
     C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR),
     C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
     C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true, TIME_PERIOD_VALIDATOR),
-    C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false, true, LONG_VALIDATOR),
-    C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+    C2_REST_HTTP_HEADERS("c2.rest.http.headers", "", false, true, VALID),

Review Comment:
   What do you think about setting the default value to "Accept:text/json" ?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),

Review Comment:
   The string concatenation break the processbuilder's behaviour, each argument need to be a separate entry in the list like before. (addAll was used previously)



##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -186,6 +202,9 @@ public long getKeepAliveDuration() {
      */
     public static class Builder {
 
+        private static final String HTTP_HEADERS_SEPARATOR = ",";

Review Comment:
   It's not uncommon that a header value contains comma or colon. I'm not sure if we'd need to add escaping for these or simply document that it's not possible to use that atm.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException, InterruptedException {
 
         Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
         String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-        initConfigFiles(bootstrapProperties, confDir);
 
-        Process process = startMiNiFi();
+        DEFAULT_LOGGER.debug("Generating minifi.properties from bootstrap.conf");

Review Comment:
   Can we move the logging to initConfigFile method? And maybe we can change the method name as well to generateMinifiProperties if it is really generating only that.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -74,12 +71,14 @@ public class StartRunner implements CommandRunner {
     private volatile ShutdownHook shutdownHook;
     private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
     private final ConfigurationChangeListener configurationChangeListener;
+    private final MiNiFiPropertiesGenerator miNiFiPropertiesGenerator;
 
     private int listenPort;
 
     public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
-        PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
-        RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ConfigurationChangeListener configurationChangeListener) {
+                       PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters,
+                       File bootstrapConfigFile,

Review Comment:
   this looks strange a little alone in this line



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = "nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", listenerHandleResult, ex);
+            return listenerHandleResult;
+        }
     }
 
-    /**
-     * Provide the mechanism by which listeners are notified
-     */
-    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) {
-        LOGGER.info("Notifying Listeners of a change");
+    private void initialize() throws IOException {
+        closeIngestors();
 
-        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            ListenerHandleResult result;
-            try {
-                listener.handleChange(new ByteBufferInputStream(newConfig.duplicate()));
-                result = new ListenerHandleResult(listener);
-            } catch (ConfigurationChangeException ex) {
-                result = new ListenerHandleResult(listener, ex);
-            }
-            listenerHandleResults.add(result);
-            LOGGER.info("Listener notification result: {}", result);
-        }
-        return listenerHandleResults;
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+        ofNullable(bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY))
+            .filter(not(String::isBlank))
+            .map(ingestors -> ingestors.split(COMMA))
+            .map(Stream::of)

Review Comment:
   you can use .flatMap(Stream::of)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java:
##########
@@ -29,59 +27,31 @@
 
 public abstract class WholeConfigDifferentiator {
 
-
-    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
-
     public static final String WHOLE_CONFIG_KEY = "Whole Config";
 
-    volatile ConfigurationFileHolder configurationFileHolder;
-
-    boolean compareInputStreamToConfigFile(InputStream inputStream) throws IOException {
-        logger.debug("Checking if change is different");
-        AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
-        ByteBuffer currentConfigFile = currentConfigFileReference.get();
-        ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
-        DataInputStream dataInputStream = new DataInputStream(inputStream);
-        try {
-            dataInputStream.readFully(byteBuffer.array());
-        } catch (EOFException e) {
-            logger.debug("New config is shorter than the current. Must be different.");
-            return true;
-        }
-        logger.debug("Read the input");
+    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
 
-        if (dataInputStream.available() != 0) {
-            return true;
-        } else {
-            return byteBuffer.compareTo(currentConfigFile) != 0;
-        }
-    }
+    protected volatile ConfigurationFileHolder configurationFileHolder;
 
     public void initialize(ConfigurationFileHolder configurationFileHolder) {
         this.configurationFileHolder = configurationFileHolder;
     }
 
-
-    public static class InputStreamInput extends WholeConfigDifferentiator implements Differentiator<InputStream> {
-        public boolean isNew(InputStream inputStream) throws IOException {
-            return compareInputStreamToConfigFile(inputStream);
-        }
-    }
-
-    public static class ByteBufferInput extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
-        public boolean isNew(ByteBuffer inputBuffer) {
-            AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
-            ByteBuffer currentConfigFile = currentConfigFileReference.get();
-            return inputBuffer.compareTo(currentConfigFile) != 0;
+    public static class ByteBufferInputDifferentiator extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+        public boolean isNew(ByteBuffer newFlowConfig) {
+            AtomicReference<ByteBuffer> currentFlowConfigReference = configurationFileHolder.getConfigFileReference();
+            ByteBuffer currentFlowConfig = currentFlowConfigReference.get();
+            logger.debug("Comparing byte buffers:\n newFlow={}\n existingFlow={}", newFlowConfig, currentFlowConfig);

Review Comment:
   this log line doesn't add any value in my opinion, so I would remove it:
   Comparing byte buffers:
    newFlow=java.nio.HeapByteBuffer[pos=0 lim=921 cap=921]
    existingFlow=java.nio.HeapByteBuffer[pos=0 lim=921 cap=921]



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties) {
         logger.info("HTTPS Connector added for Host [{}] and Port [{}]", https.getHost(), https.getPort());
     }
 
-    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+    private Supplier<IllegalArgumentException> unableToFindDifferentiatorExceptionSupplier(String differentiator) {

Review Comment:
   Even if it's possible I would rather minimalise further refactors in this PR.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();

Review Comment:
   @exceptionfactory, could you help me out here with the review? You are more familiar with the NiFi core functionality, so I would like to double check with you to see if this is the correct way to stop ongoing processing / reload and restart the flow. Thank you in advance 



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java:
##########
@@ -15,17 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.minifi.c2.provider.nifi.rest;
+package org.apache.nifi.c2.client.service.operation;
 
-import java.io.IOException;
+@FunctionalInterface
+public interface UpdateConfigurationStrategy {

Review Comment:
   Could yo add some class / method level javadoc about the intention of this class?



##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -321,6 +343,21 @@ public Builder connectTimeout(long connectTimeout) {
             return this;
         }
 
+        public Builder httpHeaders(String httpHeaders) {
+            this.httpHeaders = ofNullable(httpHeaders)
+                .filter(StringUtils::isNotBlank)
+                .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+                .map(Arrays::stream)
+                .orElseGet(Stream::of)

Review Comment:
   this can be simplified with .stream().flatMap(Arrays::stream)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
     public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
     public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
     public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security";
+    public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+    private static final Logger logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+    private static final String DOUBLE_QUOTES = "\"";
+    private static final String ETAG_HEADER = "ETag";
+    private static final String PROXY_AUTHORIZATION_HEADER = "Proxy-Authorization";
+    private static final String DEFAULT_PATH = "/";
+    private static final int BAD_REQUEST_STATUS_CODE = 400;
+    private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+    private static final String HTTP_HEADERS_SEPARATOR = ",";
+    private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
 
     private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
     private final AtomicReference<Integer> portReference = new AtomicReference<>();
     private final AtomicReference<String> hostReference = new AtomicReference<>();
     private final AtomicReference<String> pathReference = new AtomicReference<>();
     private final AtomicReference<String> queryReference = new AtomicReference<>();
+    private final AtomicReference<Map<String, String>> httpHeadersReference = new AtomicReference<>();
+
     private volatile Differentiator<ByteBuffer> differentiator;
     private volatile String connectionScheme;
     private volatile String lastEtag = "";
     private volatile boolean useEtag = false;
 
-    public PullHttpChangeIngestor() {
-        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
-    }
-
     @Override
     public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
         super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
 
-        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD_MILLISECONDS)));
         if (pollingPeriodMS.get() < 1) {
-            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
-        }
-
-        final String host = properties.getProperty(HOST_KEY);
-        if (host == null || host.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
-        }
-
-        final String path = properties.getProperty(PATH_KEY, "/");
-        final String query = properties.getProperty(QUERY_KEY, "");
-
-        final String portString = (String) properties.get(PORT_KEY);
-        final Integer port;
-        if (portString == null) {
-            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
-        } else {
-            port = Integer.parseInt(portString);
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer");
         }
 
-        portReference.set(port);
+        String host = ofNullable(properties.getProperty(HOST_KEY))
+            .filter(StringUtils::isNotBlank)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+        String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+        String query = properties.getProperty(QUERY_KEY, EMPTY);
+        Map<String, String> httpHeaders = ofNullable(properties.getProperty(HTTP_HEADERS))
+            .filter(StringUtils::isNotBlank)
+            .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+            .map(Arrays::stream)
+            .orElseGet(Stream::of)
+            .map(String::trim)
+            .map(header -> header.split(HTTP_HEADER_KEY_VALUE_SEPARATOR))
+            .filter(split -> split.length == 2)
+            .collect(toMap(split -> ofNullable(split[0]).map(String::trim).orElse(EMPTY), split -> ofNullable(split[1]).map(String::trim).orElse(EMPTY)));
+        logger.debug("Configured HTTP headers: {}", httpHeaders);
+
+        ofNullable(properties.get(PORT_KEY))
+            .map(rawPort -> (String) rawPort)

Review Comment:
   String.class::cast



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException, InterruptedException {
 
         Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
         String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-        initConfigFiles(bootstrapProperties, confDir);
 
-        Process process = startMiNiFi();
+        DEFAULT_LOGGER.debug("Generating minifi.properties from bootstrap.conf");
+        initConfigFile(bootstrapProperties, confDir);
+        Path flowConfigFile = Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();

Review Comment:
   Could you extract line 128-134 into a meaningful method? Also the System Admin Guide needs to be updated with this new approach. It still contains the yaml



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -150,17 +107,11 @@ java.arg.14=-Djava.awt.headless=true
 #c2.rest.connectionTimeout=5 sec
 #c2.rest.readTimeout=5 sec
 #c2.rest.callTimeout=10 sec
-## heartbeat in milliseconds
-#c2.agent.heartbeat.period=5000
-## define parameters about your agent
-#c2.agent.class=
+# Comma separated list of HTTP headers, eg: Accept:text/json
+#c2.rest.http.headers=text/json

Review Comment:
   seems the value is incorrect here as it doesn't contain the "Accept:" part



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
  */
 public class FileChangeIngestor implements Runnable, ChangeIngestor {
 
-    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
-    static {
-        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
-        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
 
-        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
-    }
+    static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
 
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
 
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+    private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+    private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
 
-    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
-    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
 
-    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
-    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+    private ScheduledExecutorService executorService;
 
     private Path configFilePath;
     private WatchService watchService;
     private long pollingSeconds;
-    private volatile Differentiator<ByteBuffer> differentiator;
-    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
-    private volatile ConfigurationFileHolder configurationFileHolder;
-    private volatile Properties properties;
-    private ScheduledExecutorService executorService;
 
-    protected static WatchService initializeWatcher(Path filePath) {
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        Path configFile = ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+            .filter(not(String::isBlank))
+            .map(Path::of)
+            .map(Path::toAbsolutePath)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
         try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+            this.configurationChangeNotifier = configurationChangeNotifier;
+            this.configFilePath = configFile;
+            this.pollingSeconds = ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+                .map(Long::parseLong)
+                .filter(duration -> duration > 0)
+                .map(duration -> SECONDS.convert(duration, DEFAULT_POLLING_PERIOD_UNIT))
+                .orElseThrow(() -> new IllegalArgumentException("Cannot specify a polling period with duration <=0"));
+            this.watchService = initializeWatcher(configFile);
+            this.differentiator = ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+                .filter(not(String::isBlank))
+                .map(differentiator -> ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+                    .map(Supplier::get)
+                    .orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+                .orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+            this.differentiator.initialize(configurationFileHolder);
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier", e);
+        }
 
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        if (Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile)) {

Review Comment:
   this should check the flow.json.raw if I'm not wrong



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {

Review Comment:
   nice refactor thanks!



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
  */
 public class FileChangeIngestor implements Runnable, ChangeIngestor {
 
-    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
-    static {
-        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
-        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
 
-        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
-    }
+    static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
 
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
 
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+    private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+    private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
 
-    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
-    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
 
-    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
-    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+    private ScheduledExecutorService executorService;
 
     private Path configFilePath;
     private WatchService watchService;
     private long pollingSeconds;
-    private volatile Differentiator<ByteBuffer> differentiator;
-    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
-    private volatile ConfigurationFileHolder configurationFileHolder;
-    private volatile Properties properties;
-    private ScheduledExecutorService executorService;
 
-    protected static WatchService initializeWatcher(Path filePath) {
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        Path configFile = ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+            .filter(not(String::isBlank))
+            .map(Path::of)
+            .map(Path::toAbsolutePath)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
         try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+            this.configurationChangeNotifier = configurationChangeNotifier;
+            this.configFilePath = configFile;
+            this.pollingSeconds = ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+                .map(Long::parseLong)
+                .filter(duration -> duration > 0)
+                .map(duration -> SECONDS.convert(duration, DEFAULT_POLLING_PERIOD_UNIT))
+                .orElseThrow(() -> new IllegalArgumentException("Cannot specify a polling period with duration <=0"));
+            this.watchService = initializeWatcher(configFile);
+            this.differentiator = ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+                .filter(not(String::isBlank))
+                .map(differentiator -> ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+                    .map(Supplier::get)
+                    .orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+                .orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+            this.differentiator.initialize(configurationFileHolder);
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier", e);
+        }
 
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        if (Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile)) {
+            throw new IllegalStateException("File ingestor config file (" + CONFIG_FILE_PATH_KEY
+                + ") must point to a different file than MiNiFi flow config file (" + MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey() + ")");
         }
     }
 
-    protected boolean targetChanged() {
-        boolean targetChanged;
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, runnable -> {
+            Thread notifierThread = Executors.defaultThreadFactory().newThread(runnable);
+            notifierThread.setName("File Change Notifier Thread");
+            notifierThread.setDaemon(true);
+            return notifierThread;
+        });
+        executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change in {}", configFilePath);
+        if (targetFileChanged()) {
+            logger.debug("Target file changed, checking if it's different than current flow");
+            try (FileInputStream flowCandidateInputStream = new FileInputStream(configFilePath.toFile())) {
+                ByteBuffer newFlowConfig = wrap(toByteArray(flowCandidateInputStream));
+                if (differentiator.isNew(newFlowConfig)) {
+                    logger.debug("Current flow and new flow is different, notifying listener");
+                    configurationChangeNotifier.notifyListeners(newFlowConfig);
+                    logger.debug("Listeners have been notified");
+                }
+            } catch (Exception e) {
+                logger.error("Could not successfully notify listeners.", e);
+            }
+        } else {
+            logger.debug("No change detected in {}", configFilePath);
+        }
+    }
 
-        Optional<WatchKey> watchKey = Optional.ofNullable(watchService.poll());
+    @Override
+    public void close() {
+        if (executorService != null) {
+            executorService.shutdownNow();
+        }
+    }
 
-        targetChanged = watchKey
+    boolean targetFileChanged() {
+        logger.debug("Attempting to acquire watch key");

Review Comment:
   Are these watch key aquire logs important in debug level? I would move it to trace, just tried it and the debug log is little bit noisy with it.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -14,115 +14,146 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.minifi.bootstrap.service;
 
+import static java.nio.ByteBuffer.wrap;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
-import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
-
-import java.io.File;
-import java.io.FileInputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.toByteArray;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Properties;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.io.IOUtils;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.eclipse.jetty.io.RuntimeIOException;
 import org.slf4j.Logger;
 
 public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
 
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
     private final RunMiNiFi runner;
     private final Logger logger;
     private final BootstrapFileProvider bootstrapFileProvider;
+    private final FlowEnrichService flowEnrichService;
 
-    private static final ReentrantLock handlingLock = new ReentrantLock();
-
-    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) {
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider, FlowEnrichService flowEnrichService) {
         this.runner = runner;
         this.logger = logger;
         this.bootstrapFileProvider = bootstrapFileProvider;
+        this.flowEnrichService = flowEnrichService;
     }
 
     @Override
-    public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
+    public String getDescriptor() {
+        return "MiNiFiConfigurationChangeListener";
+    }
+
+    @Override
+    public void handleChange(InputStream flowConfigInputStream) throws ConfigurationChangeException {
         logger.info("Received notification of a change");
 
         if (!handlingLock.tryLock()) {
             throw new ConfigurationChangeException("Instance is already handling another change");
         }
-        // Store the incoming stream as a byte array to be shared among components that need it
+
+        Path currentFlowConfigFile = null;
+        Path backupFlowConfigFile = null;
+        Path currentRawFlowConfigFile = null;
+        Path backupRawFlowConfigFile = null;
         try {
             Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
-            File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
 
-            File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
-            logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
+            currentFlowConfigFile = Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
+            backupFlowConfigFile = Path.of(currentFlowConfigFile + BACKUP_EXTENSION);
+            String currentFlowConfigFileBaseName = FilenameUtils.getBaseName(currentFlowConfigFile.toString());
+            currentRawFlowConfigFile = currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName + RAW_EXTENSION);
+            backupRawFlowConfigFile = currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName + RAW_EXTENSION + BACKUP_EXTENSION);
 
-            try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
-                Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
-            }
+            backup(currentFlowConfigFile, backupFlowConfigFile);
+            backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
 
-            // write out new config to file
-            Files.copy(configInputStream, configFile.toPath(), REPLACE_EXISTING);
-
-            // Create an input stream to feed to the config transformer
-            try (FileInputStream newConfigIs = new FileInputStream(configFile)) {
-                try {
-                    String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                    transformConfigurationFiles(confDir, newConfigIs, configFile, swapConfigFile);
-                } catch (Exception e) {
-                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
-                    if (!swapConfigFile.delete()) {
-                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
-                    }
-                    throw e;
-                }
-            }
+            byte[] rawFlow = toByteArray(flowConfigInputStream);
+            byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow);
+            persist(enrichedFlow, currentFlowConfigFile, true);
+            restartInstance();
+            setActiveFlowReference(wrap(rawFlow));
+            persist(rawFlow, currentRawFlowConfigFile, false);

Review Comment:
   If this throws exception the setActiveFlowReference won't be reverted if I see correctly.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
     public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
     public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
     public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security";
+    public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+    private static final Logger logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+    private static final String DOUBLE_QUOTES = "\"";
+    private static final String ETAG_HEADER = "ETag";
+    private static final String PROXY_AUTHORIZATION_HEADER = "Proxy-Authorization";
+    private static final String DEFAULT_PATH = "/";
+    private static final int BAD_REQUEST_STATUS_CODE = 400;
+    private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+    private static final String HTTP_HEADERS_SEPARATOR = ",";
+    private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
 
     private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
     private final AtomicReference<Integer> portReference = new AtomicReference<>();
     private final AtomicReference<String> hostReference = new AtomicReference<>();
     private final AtomicReference<String> pathReference = new AtomicReference<>();
     private final AtomicReference<String> queryReference = new AtomicReference<>();
+    private final AtomicReference<Map<String, String>> httpHeadersReference = new AtomicReference<>();
+
     private volatile Differentiator<ByteBuffer> differentiator;
     private volatile String connectionScheme;
     private volatile String lastEtag = "";
     private volatile boolean useEtag = false;
 
-    public PullHttpChangeIngestor() {
-        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
-    }
-
     @Override
     public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
         super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
 
-        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD_MILLISECONDS)));
         if (pollingPeriodMS.get() < 1) {
-            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
-        }
-
-        final String host = properties.getProperty(HOST_KEY);
-        if (host == null || host.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
-        }
-
-        final String path = properties.getProperty(PATH_KEY, "/");
-        final String query = properties.getProperty(QUERY_KEY, "");
-
-        final String portString = (String) properties.get(PORT_KEY);
-        final Integer port;
-        if (portString == null) {
-            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
-        } else {
-            port = Integer.parseInt(portString);
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer");
         }
 
-        portReference.set(port);
+        String host = ofNullable(properties.getProperty(HOST_KEY))
+            .filter(StringUtils::isNotBlank)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+        String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+        String query = properties.getProperty(QUERY_KEY, EMPTY);
+        Map<String, String> httpHeaders = ofNullable(properties.getProperty(HTTP_HEADERS))
+            .filter(StringUtils::isNotBlank)
+            .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+            .map(Arrays::stream)

Review Comment:
   .stream().flatMap(Arrays::stream)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "briansolo1985 (via GitHub)" <gi...@apache.org>.
briansolo1985 commented on PR #7344:
URL: https://github.com/apache/nifi/pull/7344#issuecomment-1677451500

   Thanks for the valuable comments. I incorporated all of them and made a note on all comments to flag them ready. Rerun the unit and integration test, all of them were successful.
   Also created https://issues.apache.org/jira/browse/NIFI-11948 to cover the System Admin Guide update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "briansolo1985 (via GitHub)" <gi...@apache.org>.
briansolo1985 commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1288112783


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java:
##########
@@ -80,7 +83,7 @@ public Map<String, Object> getProperties() {
 
     @Override
     public boolean requiresRestart() {
-        return true;
+        return false;

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bejancsaba commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "bejancsaba (via GitHub)" <gi...@apache.org>.
bejancsaba commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1235433806


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java:
##########
@@ -80,7 +83,7 @@ public Map<String, Object> getProperties() {
 
     @Override
     public boolean requiresRestart() {
-        return true;
+        return false;

Review Comment:
   nice :)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGenerator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.String.join;
+import static java.lang.System.getProperty;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toList;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider.getBootstrapConfFile;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_APP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_BOOTSTRAP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_DIR;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.LOG_DIR;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.getMiNiFiPropertiesPath;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.file.Path;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.util.OrderedProperties;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+
+public class MiNiFiPropertiesGenerator {
+
+    public static final String PROPERTIES_FILE_APACHE_2_0_LICENSE =
+        " Licensed to the Apache Software Foundation (ASF) under one or more\n" +
+            "# contributor license agreements.  See the NOTICE file distributed with\n" +
+            "# this work for additional information regarding copyright ownership.\n" +
+            "# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
+            "# (the \"License\"); you may not use this file except in compliance with\n" +
+            "# the License.  You may obtain a copy of the License at\n" +
+            "#\n" +
+            "#     http://www.apache.org/licenses/LICENSE-2.0\n" +
+            "#\n" +
+            "# Unless required by applicable law or agreed to in writing, software\n" +
+            "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
+            "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
+            "# See the License for the specific language governing permissions and\n" +
+            "# limitations under the License.\n" +
+            "\n";
+
+    static final List<Triple<String, String, String>> NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS = List.of(

Review Comment:
   Thanks for collecting all of these it will be really useful to have everything in place in the future.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()
+            .map(RemoteProcessGroup::stopTransmitting)
+            .forEach(future -> {
+                try {
+                    future.get(5000, TimeUnit.MICROSECONDS);
+                } catch (Exception e) {
+                    LOGGER.warn("Unable to stop remote process group", e);
+                }
+            });
+        rootProcessGroup.stopProcessing();
+    }
+
+    private void backup(Path current, Path backup) throws IOException {

Review Comment:
   There is quite some overlap in logic between this class and MiNiFiConfigurationChangeListener. What do you think about extracting the logic to the commons module. I see there is a commons-api maybe a commons-service would make sense? Or there are dependencies that doesn't allow such extraction?



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {

Review Comment:
   What do you think about adding usage information to the readme as well?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,
+            "-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName,
+            "-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension,
+            "-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName,
+            "-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension,
+            "org.apache.nifi.minifi.MiNiFi");
+    }
+
+    private File getFile(String filename, File workingDir) {
+        File file = new File(filename);
+        return file.isAbsolute() ? file : new File(workingDir, filename).getAbsoluteFile();
     }
 
-    private String getJavaCommand(Properties props) {
-        String javaCmd = props.getProperty("java");
-        if (javaCmd == null) {
-            javaCmd = DEFAULT_JAVA_CMD;
-        }
-        if (javaCmd.equals(DEFAULT_JAVA_CMD)) {
-            Optional.ofNullable(System.getenv("JAVA_HOME"))
-                .map(javaHome -> getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
-                    .orElseGet(() -> getJavaCommandBasedOnExtension(javaHome, "").orElse(DEFAULT_JAVA_CMD)));
-        }
-        return javaCmd;
+    private String getJavaCommand(Properties bootstrapProperties) {
+        String javaCommand = bootstrapProperties.getProperty(JAVA_COMMAND_KEY, DEFAULT_JAVA_CMD);
+        return javaCommand.equals(DEFAULT_JAVA_CMD)
+            ? ofNullable(System.getenv(JAVA_HOME_ENVIRONMENT_VARIABLE))
+            .flatMap(javaHome ->
+                getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
+                    .or(() -> getJavaCommandBasedOnExtension(javaHome, LINUX_FILE_EXTENSION)))
+            .orElse(DEFAULT_JAVA_CMD)
+            : javaCommand;
     }
 
     private Optional<String> getJavaCommandBasedOnExtension(String javaHome, String extension) {
-        String javaCmd = null;
-        File javaFile = new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension);
-        if (javaFile.exists() && javaFile.canExecute()) {
-            javaCmd = javaFile.getAbsolutePath();
-        }
-        return Optional.ofNullable(javaCmd);
+        return Optional.of(new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension))

Review Comment:
   Very minor I know it was just refactored but would this be the JAVA_COMMAND_KEY?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = "nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", listenerHandleResult, ex);

Review Comment:
   Shouldn't this be error or warning?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties) {
         logger.info("HTTPS Connector added for Host [{}] and Port [{}]", https.getHost(), https.getPort());
     }
 
-    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+    private Supplier<IllegalArgumentException> unableToFindDifferentiatorExceptionSupplier(String differentiator) {

Review Comment:
   I saw this across all ChangeIngestors, would it worth extracting the shared logic to an abstract class? Maybe if there is other common logic to be extracted I'm not sure, will leave it up to you.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,

Review Comment:
   again it was just a refactor but readability could be improved with a pattern what do you think? I mean for the "-D" + key + "=" + value pattern. 



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(

Review Comment:
   Extremely minor but you could use Stream.of instead of List.of().stream



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {

Review Comment:
   Thanks for the thorough test coverage.



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(
+                MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD.getKey())
+            .stream()
+            .collect(toMap(Function.identity(), __ -> randomUUID().toString()))
+        );
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        Properties miNiFiProperties = loadMiNiFiProperties();
+        MINIFI_TO_NIFI_PROPERTY_MAPPING.entrySet().stream()
+            .allMatch(entry -> Objects.equals(bootstrapProperties.getProperty(entry.getKey()), miNiFiProperties.getProperty(entry.getValue())));

Review Comment:
   My guess is that you would need assertion here as well right?



##########
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/files/raspi3/config.test.json.v1:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    "encodingVersion": {
+        "majorVersion": 2,
+        "minorVersion": 0
+    },
+    "maxTimerDrivenThreadCount": 10,
+    "maxEventDrivenThreadCount": 1,
+    "registries": [],
+    "parameterContexts": [],
+    "parameterProviders": [],
+    "controllerServices": [],
+    "reportingTasks": [],
+    "templates": [],
+    "rootGroup": {
+        "identifier": "c1b4e586-2011-3f81-a11e-8d669f084d1c",
+        "instanceIdentifier": "29db3dbc-0188-1000-7025-4cab8b52d278",
+        "name": "NiFi Flow",

Review Comment:
   I suppose it doesn't really matter but can this be MiNiFi Flow?



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {
+
+    static final String COMMON_SSL_CONTEXT_SERVICE_NAME = "SSL-Context-Service";
+    static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context Service";
+    static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME = "Site-To-Site-Provenance-Reporting";
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlowEnrichService.class);
+
+    private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi";
+    private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE = "org.apache.nifi.ssl.StandardRestrictedSSLContextService";
+    private static final String RESTRICTED_SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.RestrictedSSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.SSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_NAR = "nifi-ssl-context-service-nar";
+    private static final String STANDARD_SERVICES_API_NAR_ARTIFACT = "nifi-standard-services-api-nar";
+    private static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
+    private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar";
+    private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
+    private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream";
+
+    private final ReadableProperties minifiProperties;
+
+    public FlowEnrichService(ReadableProperties minifiProperties) {
+        this.minifiProperties = minifiProperties;
+    }
+
+    public byte[] enrichFlow(byte[] flowCandidate) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Enriching flow with content: \n{}", new String(flowCandidate, UTF_8));
+        }
+
+        VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate);
+
+        Optional<Integer> maxConcurrentThreads = ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey()))
+            .map(Integer::parseInt);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxEventDrivenThreadCount);
+
+        VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
+        if (rootGroup.getIdentifier() == null) {
+            rootGroup.setIdentifier(UUID.randomUUID().toString());
+        }
+        if (rootGroup.getInstanceIdentifier() == null) {
+            rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+        }
+
+        Optional<VersionedControllerService> commonSslControllerService = createCommonSslControllerService();
+        commonSslControllerService
+            .ifPresent(sslControllerService -> {
+                List<VersionedControllerService> currentControllerServices = ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new);
+                currentControllerServices.add(sslControllerService);
+                versionedDataflow.setControllerServices(currentControllerServices);
+            });
+
+        commonSslControllerService
+            .filter(__ -> parseBoolean(minifiProperties.getProperty(NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey())))
+            .map(VersionedComponent::getInstanceIdentifier)
+            .ifPresent(commonSslControllerServiceInstanceId -> overrideProcessorsSslControllerService(rootGroup, commonSslControllerServiceInstanceId));
+
+        createProvenanceReportingTask(commonSslControllerService)

Review Comment:
   What do you think instead of passing around the Optional as an argument you would "resolve" it here so only the instance identifier is passed not the whole controllerService optional
   ```
   sslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY)
   ```



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()

Review Comment:
   What do you think about doing this in parallel? If I'm not mistaken future.get is blocking so this will stop remote process groups one after another with 5sec timeout.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);

Review Comment:
   Very minor but these could be final right?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;
+
+    @BeforeEach
+    public void setup() {
+        flowConfigurationFile = Path.of(tempDir.getAbsolutePath(), FLOW_CONFIG_FILE_NAME).toAbsolutePath();
+        backupFlowConfigurationFile = Path.of(flowConfigurationFile + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFile.toString());
+        rawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        backupRawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+
+        testUpdateConfiguratinStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, flowConfigurationFile.toString());
+
+        writeGzipFile(flowConfigurationFile, ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
+        writePlainTextFile(rawFlowConfigurationFile, ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
+    }
+
+    @Test
+    public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertTrue(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(1)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(1)).startProcessing();
+    }
+
+    @Test
+    public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+        doThrow(new IOException()).when(mockFlowService).load(null);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertFalse(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(0)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(0)).startProcessing();
+    }
+
+    private void writeGzipFile(Path path, byte[] content) {
+        try (ByteArrayInputStream inputStream = new ByteArrayInputStream(content);
+             OutputStream outputStream = new GZIPOutputStream(newOutputStream(path, WRITE, CREATE, TRUNCATE_EXISTING))) {
+            inputStream.transferTo(outputStream);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private byte[] readGzipFile(Path path) {
+        try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(path));
+             ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+            inputStream.transferTo(outputStream);
+            outputStream.flush();
+            return outputStream.toByteArray();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void writePlainTextFile(Path path, byte[] content) {

Review Comment:
   As these are "just" tests you could add the exception to the method signature making it much shorter. This is minor enough so will leave it to you.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/flow.json.raw:
##########
@@ -0,0 +1,38 @@
+{
+  "encodingVersion": {
+    "majorVersion": 2,
+    "minorVersion": 0
+  },
+  "maxTimerDrivenThreadCount": 1,
+  "maxEventDrivenThreadCount": 1,
+  "registries": [],
+  "parameterContexts": [],
+  "parameterProviders": [],
+  "controllerServices": [],
+  "reportingTasks": [],
+  "templates": [],
+  "rootGroup": {
+    "name": "NiFi Flow",

Review Comment:
   I know I commented earlier and there are a ton of references to this json maybe it doesn't worth rewriting it everywhere but what do you think about renaming it here to MiNiFi Flow?



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {

Review Comment:
   As I see this is the heart and soul of the transformation logic, thanks for keeping it clean and understandable as much as possible even with the given constraints.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ComponentPropertyProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.util.Objects.nonNull;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithId;
+
+public class ComponentPropertyProvider {

Review Comment:
   Thanks for the clear code here however given the complexity could you please add a few sentences in javadoc to describe the intention here?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;

Review Comment:
   There is a missing "o" here "testUpdateConfiguratinStrategy"



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);

Review Comment:
   Does this mean that the position information is not available in the source? I was thinking about this I suppose we usually don't expect MiNiFi flows to be visualised so it doesn't make much difference. Was thinking about adding an incrementing constant to both x and y for each element which still wouldn't be nice but would show the volume of processors. Will leave it to you possibly doesn't worth the effort.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {
+
+    public static final String TRANSFORM_YML = "transform-yml";
+
+    private static final String COMMAND_DESCRIPTION = "Transform MiNiFi config YAML into NiFi flow JSON format";
+    private static final String PROPERTY_KEY_VALUE_DELIMITER = "=";
+
+    private final PathInputStreamFactory pathInputStreamFactory;
+    private final PathOutputStreamFactory pathOutputStreamFactory;
+
+    public TransformYamlCommandFactory(PathInputStreamFactory pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) {
+        this.pathInputStreamFactory = pathInputStreamFactory;
+        this.pathOutputStreamFactory = pathOutputStreamFactory;
+    }
+
+    public ConfigMain.Command create() {
+        return new ConfigMain.Command(this::transformYamlToJson, COMMAND_DESCRIPTION);
+    }
+
+    private int transformYamlToJson(String[] args) {
+        if (args.length != 5) {
+            printTransformYmlUsage();
+            return ConfigMain.ERR_INVALID_ARGS;
+        }
+
+        String sourceMiNiFiConfigPath = args[1];

Review Comment:
   Just a minor thank what about calling this sourceMiNiFiConfigYamlPath? It could help with readability.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);
+
+    private final ConfigSchema configSchema;
+    private final ComponentPropertyProvider componentPropertyProvider;
+
+    public ConfigSchemaToVersionedDataFlowTransformer(ConfigSchema configSchema) {
+        this.configSchema = configSchema;
+        this.componentPropertyProvider = new ComponentPropertyProvider(configSchema);
+    }
+
+    public Map<String, String> extractProperties() {
+        CorePropertiesSchema coreProperties = configSchema.getCoreProperties();
+        FlowFileRepositorySchema flowFileRepositoryProperties = configSchema.getFlowfileRepositoryProperties();
+        ContentRepositorySchema contentRepositoryProperties = configSchema.getContentRepositoryProperties();
+        ProvenanceRepositorySchema provenanceRepositoryProperties = configSchema.getProvenanceRepositorySchema();
+        ComponentStatusRepositorySchema componentStatusRepositoryProperties = configSchema.getComponentStatusRepositoryProperties();
+        SwapSchema swapProperties = configSchema.getFlowfileRepositoryProperties().getSwapProperties();
+
+        return Stream.concat(
+                Stream.of(
+                    entry(NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey(), coreProperties.getMaxConcurrentThreads().toString()),
+                    entry(FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD, coreProperties.getFlowControllerGracefulShutdownPeriod()),
+                    entry(WRITE_DELAY_INTERVAL, coreProperties.getFlowServiceWriteDelayInterval()),
+                    entry(ADMINISTRATIVE_YIELD_DURATION, coreProperties.getAdministrativeYieldDuration()),
+                    entry(BORED_YIELD_DURATION, coreProperties.getBoredYieldDuration()),
+                    entry(VARIABLE_REGISTRY_PROPERTIES, coreProperties.getVariableRegistryProperties()),
+                    entry(FLOWFILE_REPOSITORY_IMPLEMENTATION, flowFileRepositoryProperties.getFlowFileRepository()),
+                    entry(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, flowFileRepositoryProperties.getCheckpointInterval()),
+                    entry(FLOWFILE_REPOSITORY_ALWAYS_SYNC, Boolean.toString(flowFileRepositoryProperties.getAlwaysSync())),
+                    entry(CONTENT_REPOSITORY_IMPLEMENTATION, contentRepositoryProperties.getContentRepository()),
+                    entry(MAX_APPENDABLE_CLAIM_SIZE, contentRepositoryProperties.getContentClaimMaxAppendableSize()),
+                    entry(CONTENT_ARCHIVE_MAX_RETENTION_PERIOD, contentRepositoryProperties.getContentRepoArchiveMaxRetentionPeriod()),
+                    entry(CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE, contentRepositoryProperties.getContentRepoArchiveMaxUsagePercentage()),
+                    entry(CONTENT_ARCHIVE_ENABLED, Boolean.toString(contentRepositoryProperties.getContentRepoArchiveEnabled())),
+                    entry(PROVENANCE_REPO_IMPLEMENTATION_CLASS, provenanceRepositoryProperties.getProvenanceRepository()),
+                    entry(PROVENANCE_ROLLOVER_TIME, provenanceRepositoryProperties.getProvenanceRepoRolloverTimeKey()),
+                    entry(PROVENANCE_INDEX_SHARD_SIZE, provenanceRepositoryProperties.getProvenanceRepoIndexShardSize()),
+                    entry(PROVENANCE_MAX_STORAGE_SIZE, provenanceRepositoryProperties.getProvenanceRepoMaxStorageSize()),
+                    entry(PROVENANCE_MAX_STORAGE_TIME, provenanceRepositoryProperties.getProvenanceRepoMaxStorageTime()),
+                    entry(COMPONENT_STATUS_SNAPSHOT_FREQUENCY, componentStatusRepositoryProperties.getSnapshotFrequency()),
+                    entry(QUEUE_SWAP_THRESHOLD, swapProperties.getThreshold().toString())
+                ),
+                ofNullable(configSchema.getNifiPropertiesOverrides().entrySet()).orElse(Set.of()).stream()

Review Comment:
   entrySet of a map can't be null so I think there is no need for the nullable.



##########
nifi-commons/nifi-kubernetes-client/src/test/java/org/apache/nifi/kubernetes/client/StandardKubernetesClientProviderTest.java:
##########
@@ -31,7 +31,7 @@ void setProvider() {
         provider = new StandardKubernetesClientProvider();
     }
 
-    @Timeout(5)
+    @Timeout(10)

Review Comment:
   It was unstable with 5? :)



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {

Review Comment:
   As this is an actual service what do you think moving it out from the commons-api? I see there is no commons-service module maybe it would worth creating one. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei commented on pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "ferencerdei (via GitHub)" <gi...@apache.org>.
ferencerdei commented on PR #7344:
URL: https://github.com/apache/nifi/pull/7344#issuecomment-1683618041

   Thank you for the effort you put into this change. It's a really huge milestone in MiNiFi.
   The change looks good to me, also tested it with various scenarios and everything worked fine
   
   +1 from my side


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] ferencerdei closed pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "ferencerdei (via GitHub)" <gi...@apache.org>.
ferencerdei closed pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON
URL: https://github.com/apache/nifi/pull/7344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "briansolo1985 (via GitHub)" <gi...@apache.org>.
briansolo1985 commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1288113651


##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = "nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", listenerHandleResult, ex);

Review Comment:
   Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] briansolo1985 commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "briansolo1985 (via GitHub)" <gi...@apache.org>.
briansolo1985 commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1288560897


##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,
+            "-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName,
+            "-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension,
+            "-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName,
+            "-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension,
+            "org.apache.nifi.minifi.MiNiFi");
+    }
+
+    private File getFile(String filename, File workingDir) {
+        File file = new File(filename);
+        return file.isAbsolute() ? file : new File(workingDir, filename).getAbsoluteFile();
     }
 
-    private String getJavaCommand(Properties props) {
-        String javaCmd = props.getProperty("java");
-        if (javaCmd == null) {
-            javaCmd = DEFAULT_JAVA_CMD;
-        }
-        if (javaCmd.equals(DEFAULT_JAVA_CMD)) {
-            Optional.ofNullable(System.getenv("JAVA_HOME"))
-                .map(javaHome -> getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
-                    .orElseGet(() -> getJavaCommandBasedOnExtension(javaHome, "").orElse(DEFAULT_JAVA_CMD)));
-        }
-        return javaCmd;
+    private String getJavaCommand(Properties bootstrapProperties) {
+        String javaCommand = bootstrapProperties.getProperty(JAVA_COMMAND_KEY, DEFAULT_JAVA_CMD);
+        return javaCommand.equals(DEFAULT_JAVA_CMD)
+            ? ofNullable(System.getenv(JAVA_HOME_ENVIRONMENT_VARIABLE))
+            .flatMap(javaHome ->
+                getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
+                    .or(() -> getJavaCommandBasedOnExtension(javaHome, LINUX_FILE_EXTENSION)))
+            .orElse(DEFAULT_JAVA_CMD)
+            : javaCommand;
     }
 
     private Optional<String> getJavaCommandBasedOnExtension(String javaHome, String extension) {
-        String javaCmd = null;
-        File javaFile = new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension);
-        if (javaFile.exists() && javaFile.canExecute()) {
-            javaCmd = javaFile.getAbsolutePath();
-        }
-        return Optional.ofNullable(javaCmd);
+        return Optional.of(new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension))

Review Comment:
   Eliminated the hardcoded strings



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,

Review Comment:
   Refactored to make this more readable



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties) {
         logger.info("HTTPS Connector added for Host [{}] and Port [{}]", https.getHost(), https.getPort());
     }
 
-    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+    private Supplier<IllegalArgumentException> unableToFindDifferentiatorExceptionSupplier(String differentiator) {

Review Comment:
   I left is as it is for the time being



##########
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/files/raspi3/config.test.json.v1:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    "encodingVersion": {
+        "majorVersion": 2,
+        "minorVersion": 0
+    },
+    "maxTimerDrivenThreadCount": 10,
+    "maxEventDrivenThreadCount": 1,
+    "registries": [],
+    "parameterContexts": [],
+    "parameterProviders": [],
+    "controllerServices": [],
+    "reportingTasks": [],
+    "templates": [],
+    "rootGroup": {
+        "identifier": "c1b4e586-2011-3f81-a11e-8d669f084d1c",
+        "instanceIdentifier": "29db3dbc-0188-1000-7025-4cab8b52d278",
+        "name": "NiFi Flow",

Review Comment:
   Updated



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(
+                MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD.getKey())
+            .stream()
+            .collect(toMap(Function.identity(), __ -> randomUUID().toString()))
+        );
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        Properties miNiFiProperties = loadMiNiFiProperties();
+        MINIFI_TO_NIFI_PROPERTY_MAPPING.entrySet().stream()
+            .allMatch(entry -> Objects.equals(bootstrapProperties.getProperty(entry.getKey()), miNiFiProperties.getProperty(entry.getValue())));

Review Comment:
   Good point



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {
+
+    static final String COMMON_SSL_CONTEXT_SERVICE_NAME = "SSL-Context-Service";
+    static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context Service";
+    static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME = "Site-To-Site-Provenance-Reporting";
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlowEnrichService.class);
+
+    private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi";
+    private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE = "org.apache.nifi.ssl.StandardRestrictedSSLContextService";
+    private static final String RESTRICTED_SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.RestrictedSSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.SSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_NAR = "nifi-ssl-context-service-nar";
+    private static final String STANDARD_SERVICES_API_NAR_ARTIFACT = "nifi-standard-services-api-nar";
+    private static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
+    private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar";
+    private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
+    private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream";
+
+    private final ReadableProperties minifiProperties;
+
+    public FlowEnrichService(ReadableProperties minifiProperties) {
+        this.minifiProperties = minifiProperties;
+    }
+
+    public byte[] enrichFlow(byte[] flowCandidate) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Enriching flow with content: \n{}", new String(flowCandidate, UTF_8));
+        }
+
+        VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate);
+
+        Optional<Integer> maxConcurrentThreads = ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey()))
+            .map(Integer::parseInt);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxEventDrivenThreadCount);
+
+        VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
+        if (rootGroup.getIdentifier() == null) {
+            rootGroup.setIdentifier(UUID.randomUUID().toString());
+        }
+        if (rootGroup.getInstanceIdentifier() == null) {
+            rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+        }
+
+        Optional<VersionedControllerService> commonSslControllerService = createCommonSslControllerService();
+        commonSslControllerService
+            .ifPresent(sslControllerService -> {
+                List<VersionedControllerService> currentControllerServices = ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new);
+                currentControllerServices.add(sslControllerService);
+                versionedDataflow.setControllerServices(currentControllerServices);
+            });
+
+        commonSslControllerService
+            .filter(__ -> parseBoolean(minifiProperties.getProperty(NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey())))
+            .map(VersionedComponent::getInstanceIdentifier)
+            .ifPresent(commonSslControllerServiceInstanceId -> overrideProcessorsSslControllerService(rootGroup, commonSslControllerServiceInstanceId));
+
+        createProvenanceReportingTask(commonSslControllerService)

Review Comment:
   Good suggestion, thank you



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;

Review Comment:
   Fixed



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(

Review Comment:
   Sure



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {

Review Comment:
   Yes I didn't feel comfortable with putting this into an api module, but also felt that creating a separate module just for this would be an overkill.
   On second thought it seems to be reasonable though, so I moved to a framework module.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);

Review Comment:
   Yes, in the YAML source there was no position related info.
   Position is a required attribute, but in case of MiNiFi it does not make too much sense, as these properties are for the UI, and MiNiFi does not have it. 
   If for some reasons somebody loaded the flow into NiFi. they can still manually order the processors.
   If you don't mind I would keep it as is now



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),

Review Comment:
   Thanks for finding this issue, really appreciate it!



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java:
##########
@@ -93,25 +79,47 @@ public enum MiNiFiProperties {
     C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR),
     C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
     C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true, TIME_PERIOD_VALIDATOR),
-    C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false, true, LONG_VALIDATOR),
-    C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+    C2_REST_HTTP_HEADERS("c2.rest.http.headers", "", false, true, VALID),

Review Comment:
   Set it to `Accept:application/json`



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -150,17 +107,11 @@ java.arg.14=-Djava.awt.headless=true
 #c2.rest.connectionTimeout=5 sec
 #c2.rest.readTimeout=5 sec
 #c2.rest.callTimeout=10 sec
-## heartbeat in milliseconds
-#c2.agent.heartbeat.period=5000
-## define parameters about your agent
-#c2.agent.class=
+# Comma separated list of HTTP headers, eg: Accept:text/json
+#c2.rest.http.headers=text/json

Review Comment:
   Thanks, I added the missing `Accept` part and also replaced text/json to application/json everywhere



##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -321,6 +343,21 @@ public Builder connectTimeout(long connectTimeout) {
             return this;
         }
 
+        public Builder httpHeaders(String httpHeaders) {
+            this.httpHeaders = ofNullable(httpHeaders)
+                .filter(StringUtils::isNotBlank)
+                .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+                .map(Arrays::stream)
+                .orElseGet(Stream::of)

Review Comment:
   Thanks, replaced



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java:
##########
@@ -15,17 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.minifi.c2.provider.nifi.rest;
+package org.apache.nifi.c2.client.service.operation;
 
-import java.io.IOException;
+@FunctionalInterface
+public interface UpdateConfigurationStrategy {

Review Comment:
   Sure, added



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException, InterruptedException {
 
         Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
         String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-        initConfigFiles(bootstrapProperties, confDir);
 
-        Process process = startMiNiFi();
+        DEFAULT_LOGGER.debug("Generating minifi.properties from bootstrap.conf");
+        initConfigFile(bootstrapProperties, confDir);
+        Path flowConfigFile = Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();

Review Comment:
   Extracted. Updated the docs as well



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = "nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", listenerHandleResult, ex);
+            return listenerHandleResult;
+        }
     }
 
-    /**
-     * Provide the mechanism by which listeners are notified
-     */
-    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) {
-        LOGGER.info("Notifying Listeners of a change");
+    private void initialize() throws IOException {
+        closeIngestors();
 
-        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            ListenerHandleResult result;
-            try {
-                listener.handleChange(new ByteBufferInputStream(newConfig.duplicate()));
-                result = new ListenerHandleResult(listener);
-            } catch (ConfigurationChangeException ex) {
-                result = new ListenerHandleResult(listener, ex);
-            }
-            listenerHandleResults.add(result);
-            LOGGER.info("Listener notification result: {}", result);
-        }
-        return listenerHandleResults;
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+        ofNullable(bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY))
+            .filter(not(String::isBlank))
+            .map(ingestors -> ingestors.split(COMMA))
+            .map(Stream::of)

Review Comment:
   Fixed



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()
+            .map(RemoteProcessGroup::stopTransmitting)
+            .forEach(future -> {
+                try {
+                    future.get(5000, TimeUnit.MICROSECONDS);
+                } catch (Exception e) {
+                    LOGGER.warn("Unable to stop remote process group", e);
+                }
+            });
+        rootProcessGroup.stopProcessing();
+    }
+
+    private void backup(Path current, Path backup) throws IOException {

Review Comment:
   I extracted the common logic to the newly created minifi-commons-framework module



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()

Review Comment:
   Done



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);

Review Comment:
   Yes, marked them as final



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
     public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
     public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
     public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security";
+    public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+    private static final Logger logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+    private static final String DOUBLE_QUOTES = "\"";
+    private static final String ETAG_HEADER = "ETag";
+    private static final String PROXY_AUTHORIZATION_HEADER = "Proxy-Authorization";
+    private static final String DEFAULT_PATH = "/";
+    private static final int BAD_REQUEST_STATUS_CODE = 400;
+    private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+    private static final String HTTP_HEADERS_SEPARATOR = ",";
+    private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
 
     private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
     private final AtomicReference<Integer> portReference = new AtomicReference<>();
     private final AtomicReference<String> hostReference = new AtomicReference<>();
     private final AtomicReference<String> pathReference = new AtomicReference<>();
     private final AtomicReference<String> queryReference = new AtomicReference<>();
+    private final AtomicReference<Map<String, String>> httpHeadersReference = new AtomicReference<>();
+
     private volatile Differentiator<ByteBuffer> differentiator;
     private volatile String connectionScheme;
     private volatile String lastEtag = "";
     private volatile boolean useEtag = false;
 
-    public PullHttpChangeIngestor() {
-        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
-    }
-
     @Override
     public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
         super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
 
-        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD_MILLISECONDS)));
         if (pollingPeriodMS.get() < 1) {
-            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
-        }
-
-        final String host = properties.getProperty(HOST_KEY);
-        if (host == null || host.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
-        }
-
-        final String path = properties.getProperty(PATH_KEY, "/");
-        final String query = properties.getProperty(QUERY_KEY, "");
-
-        final String portString = (String) properties.get(PORT_KEY);
-        final Integer port;
-        if (portString == null) {
-            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
-        } else {
-            port = Integer.parseInt(portString);
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer");
         }
 
-        portReference.set(port);
+        String host = ofNullable(properties.getProperty(HOST_KEY))
+            .filter(StringUtils::isNotBlank)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+        String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+        String query = properties.getProperty(QUERY_KEY, EMPTY);
+        Map<String, String> httpHeaders = ofNullable(properties.getProperty(HTTP_HEADERS))
+            .filter(StringUtils::isNotBlank)
+            .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+            .map(Arrays::stream)

Review Comment:
   Fixed



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
  */
 public class FileChangeIngestor implements Runnable, ChangeIngestor {
 
-    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
-    static {
-        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
-        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
 
-        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
-    }
+    static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
 
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
 
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+    private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+    private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
 
-    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
-    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
 
-    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
-    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+    private ScheduledExecutorService executorService;
 
     private Path configFilePath;
     private WatchService watchService;
     private long pollingSeconds;
-    private volatile Differentiator<ByteBuffer> differentiator;
-    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
-    private volatile ConfigurationFileHolder configurationFileHolder;
-    private volatile Properties properties;
-    private ScheduledExecutorService executorService;
 
-    protected static WatchService initializeWatcher(Path filePath) {
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        Path configFile = ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+            .filter(not(String::isBlank))
+            .map(Path::of)
+            .map(Path::toAbsolutePath)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
         try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+            this.configurationChangeNotifier = configurationChangeNotifier;
+            this.configFilePath = configFile;
+            this.pollingSeconds = ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+                .map(Long::parseLong)
+                .filter(duration -> duration > 0)
+                .map(duration -> SECONDS.convert(duration, DEFAULT_POLLING_PERIOD_UNIT))
+                .orElseThrow(() -> new IllegalArgumentException("Cannot specify a polling period with duration <=0"));
+            this.watchService = initializeWatcher(configFile);
+            this.differentiator = ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+                .filter(not(String::isBlank))
+                .map(differentiator -> ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+                    .map(Supplier::get)
+                    .orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+                .orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+            this.differentiator.initialize(configurationFileHolder);
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier", e);
+        }
 
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        if (Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile)) {

Review Comment:
   Good catch, updated the criteria and extracted the check to a separate method



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -14,115 +14,146 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.minifi.bootstrap.service;
 
+import static java.nio.ByteBuffer.wrap;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
-import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
-
-import java.io.File;
-import java.io.FileInputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.toByteArray;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Properties;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.io.IOUtils;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.eclipse.jetty.io.RuntimeIOException;
 import org.slf4j.Logger;
 
 public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
 
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
     private final RunMiNiFi runner;
     private final Logger logger;
     private final BootstrapFileProvider bootstrapFileProvider;
+    private final FlowEnrichService flowEnrichService;
 
-    private static final ReentrantLock handlingLock = new ReentrantLock();
-
-    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) {
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider, FlowEnrichService flowEnrichService) {
         this.runner = runner;
         this.logger = logger;
         this.bootstrapFileProvider = bootstrapFileProvider;
+        this.flowEnrichService = flowEnrichService;
     }
 
     @Override
-    public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
+    public String getDescriptor() {
+        return "MiNiFiConfigurationChangeListener";
+    }
+
+    @Override
+    public void handleChange(InputStream flowConfigInputStream) throws ConfigurationChangeException {
         logger.info("Received notification of a change");
 
         if (!handlingLock.tryLock()) {
             throw new ConfigurationChangeException("Instance is already handling another change");
         }
-        // Store the incoming stream as a byte array to be shared among components that need it
+
+        Path currentFlowConfigFile = null;
+        Path backupFlowConfigFile = null;
+        Path currentRawFlowConfigFile = null;
+        Path backupRawFlowConfigFile = null;
         try {
             Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
-            File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
 
-            File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
-            logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
+            currentFlowConfigFile = Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
+            backupFlowConfigFile = Path.of(currentFlowConfigFile + BACKUP_EXTENSION);
+            String currentFlowConfigFileBaseName = FilenameUtils.getBaseName(currentFlowConfigFile.toString());
+            currentRawFlowConfigFile = currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName + RAW_EXTENSION);
+            backupRawFlowConfigFile = currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName + RAW_EXTENSION + BACKUP_EXTENSION);
 
-            try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
-                Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
-            }
+            backup(currentFlowConfigFile, backupFlowConfigFile);
+            backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
 
-            // write out new config to file
-            Files.copy(configInputStream, configFile.toPath(), REPLACE_EXISTING);
-
-            // Create an input stream to feed to the config transformer
-            try (FileInputStream newConfigIs = new FileInputStream(configFile)) {
-                try {
-                    String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                    transformConfigurationFiles(confDir, newConfigIs, configFile, swapConfigFile);
-                } catch (Exception e) {
-                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
-                    if (!swapConfigFile.delete()) {
-                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
-                    }
-                    throw e;
-                }
-            }
+            byte[] rawFlow = toByteArray(flowConfigInputStream);
+            byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow);
+            persist(enrichedFlow, currentFlowConfigFile, true);
+            restartInstance();
+            setActiveFlowReference(wrap(rawFlow));
+            persist(rawFlow, currentRawFlowConfigFile, false);

Review Comment:
   Good point, moved setActiveFlowReference so it's the last step now



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {

Review Comment:
   Sure will add 



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;
+
+    @BeforeEach
+    public void setup() {
+        flowConfigurationFile = Path.of(tempDir.getAbsolutePath(), FLOW_CONFIG_FILE_NAME).toAbsolutePath();
+        backupFlowConfigurationFile = Path.of(flowConfigurationFile + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFile.toString());
+        rawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        backupRawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+
+        testUpdateConfiguratinStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, flowConfigurationFile.toString());
+
+        writeGzipFile(flowConfigurationFile, ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
+        writePlainTextFile(rawFlowConfigurationFile, ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
+    }
+
+    @Test
+    public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertTrue(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(1)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(1)).startProcessing();
+    }
+
+    @Test
+    public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+        doThrow(new IOException()).when(mockFlowService).load(null);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertFalse(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(0)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(0)).startProcessing();
+    }
+
+    private void writeGzipFile(Path path, byte[] content) {
+        try (ByteArrayInputStream inputStream = new ByteArrayInputStream(content);
+             OutputStream outputStream = new GZIPOutputStream(newOutputStream(path, WRITE, CREATE, TRUNCATE_EXISTING))) {
+            inputStream.transferTo(outputStream);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private byte[] readGzipFile(Path path) {
+        try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(path));
+             ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+            inputStream.transferTo(outputStream);
+            outputStream.flush();
+            return outputStream.toByteArray();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void writePlainTextFile(Path path, byte[] content) {

Review Comment:
   Sure, fixed



##########
nifi-commons/nifi-kubernetes-client/src/test/java/org/apache/nifi/kubernetes/client/StandardKubernetesClientProviderTest.java:
##########
@@ -31,7 +31,7 @@ void setProvider() {
         provider = new StandardKubernetesClientProvider();
     }
 
-    @Timeout(5)
+    @Timeout(10)

Review Comment:
   Yes. Will revert this, just needed it until the development is done.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/flow.json.raw:
##########
@@ -0,0 +1,38 @@
+{
+  "encodingVersion": {
+    "majorVersion": 2,
+    "minorVersion": 0
+  },
+  "maxTimerDrivenThreadCount": 1,
+  "maxEventDrivenThreadCount": 1,
+  "registries": [],
+  "parameterContexts": [],
+  "parameterProviders": [],
+  "controllerServices": [],
+  "reportingTasks": [],
+  "templates": [],
+  "rootGroup": {
+    "name": "NiFi Flow",

Review Comment:
   Renamed



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ComponentPropertyProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.util.Objects.nonNull;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithId;
+
+public class ComponentPropertyProvider {

Review Comment:
   Added



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {
+
+    public static final String TRANSFORM_YML = "transform-yml";
+
+    private static final String COMMAND_DESCRIPTION = "Transform MiNiFi config YAML into NiFi flow JSON format";
+    private static final String PROPERTY_KEY_VALUE_DELIMITER = "=";
+
+    private final PathInputStreamFactory pathInputStreamFactory;
+    private final PathOutputStreamFactory pathOutputStreamFactory;
+
+    public TransformYamlCommandFactory(PathInputStreamFactory pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) {
+        this.pathInputStreamFactory = pathInputStreamFactory;
+        this.pathOutputStreamFactory = pathOutputStreamFactory;
+    }
+
+    public ConfigMain.Command create() {
+        return new ConfigMain.Command(this::transformYamlToJson, COMMAND_DESCRIPTION);
+    }
+
+    private int transformYamlToJson(String[] args) {
+        if (args.length != 5) {
+            printTransformYmlUsage();
+            return ConfigMain.ERR_INVALID_ARGS;
+        }
+
+        String sourceMiNiFiConfigPath = args[1];

Review Comment:
   Updated



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);
+
+    private final ConfigSchema configSchema;
+    private final ComponentPropertyProvider componentPropertyProvider;
+
+    public ConfigSchemaToVersionedDataFlowTransformer(ConfigSchema configSchema) {
+        this.configSchema = configSchema;
+        this.componentPropertyProvider = new ComponentPropertyProvider(configSchema);
+    }
+
+    public Map<String, String> extractProperties() {
+        CorePropertiesSchema coreProperties = configSchema.getCoreProperties();
+        FlowFileRepositorySchema flowFileRepositoryProperties = configSchema.getFlowfileRepositoryProperties();
+        ContentRepositorySchema contentRepositoryProperties = configSchema.getContentRepositoryProperties();
+        ProvenanceRepositorySchema provenanceRepositoryProperties = configSchema.getProvenanceRepositorySchema();
+        ComponentStatusRepositorySchema componentStatusRepositoryProperties = configSchema.getComponentStatusRepositoryProperties();
+        SwapSchema swapProperties = configSchema.getFlowfileRepositoryProperties().getSwapProperties();
+
+        return Stream.concat(
+                Stream.of(
+                    entry(NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey(), coreProperties.getMaxConcurrentThreads().toString()),
+                    entry(FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD, coreProperties.getFlowControllerGracefulShutdownPeriod()),
+                    entry(WRITE_DELAY_INTERVAL, coreProperties.getFlowServiceWriteDelayInterval()),
+                    entry(ADMINISTRATIVE_YIELD_DURATION, coreProperties.getAdministrativeYieldDuration()),
+                    entry(BORED_YIELD_DURATION, coreProperties.getBoredYieldDuration()),
+                    entry(VARIABLE_REGISTRY_PROPERTIES, coreProperties.getVariableRegistryProperties()),
+                    entry(FLOWFILE_REPOSITORY_IMPLEMENTATION, flowFileRepositoryProperties.getFlowFileRepository()),
+                    entry(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, flowFileRepositoryProperties.getCheckpointInterval()),
+                    entry(FLOWFILE_REPOSITORY_ALWAYS_SYNC, Boolean.toString(flowFileRepositoryProperties.getAlwaysSync())),
+                    entry(CONTENT_REPOSITORY_IMPLEMENTATION, contentRepositoryProperties.getContentRepository()),
+                    entry(MAX_APPENDABLE_CLAIM_SIZE, contentRepositoryProperties.getContentClaimMaxAppendableSize()),
+                    entry(CONTENT_ARCHIVE_MAX_RETENTION_PERIOD, contentRepositoryProperties.getContentRepoArchiveMaxRetentionPeriod()),
+                    entry(CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE, contentRepositoryProperties.getContentRepoArchiveMaxUsagePercentage()),
+                    entry(CONTENT_ARCHIVE_ENABLED, Boolean.toString(contentRepositoryProperties.getContentRepoArchiveEnabled())),
+                    entry(PROVENANCE_REPO_IMPLEMENTATION_CLASS, provenanceRepositoryProperties.getProvenanceRepository()),
+                    entry(PROVENANCE_ROLLOVER_TIME, provenanceRepositoryProperties.getProvenanceRepoRolloverTimeKey()),
+                    entry(PROVENANCE_INDEX_SHARD_SIZE, provenanceRepositoryProperties.getProvenanceRepoIndexShardSize()),
+                    entry(PROVENANCE_MAX_STORAGE_SIZE, provenanceRepositoryProperties.getProvenanceRepoMaxStorageSize()),
+                    entry(PROVENANCE_MAX_STORAGE_TIME, provenanceRepositoryProperties.getProvenanceRepoMaxStorageTime()),
+                    entry(COMPONENT_STATUS_SNAPSHOT_FREQUENCY, componentStatusRepositoryProperties.getSnapshotFrequency()),
+                    entry(QUEUE_SWAP_THRESHOLD, swapProperties.getThreshold().toString())
+                ),
+                ofNullable(configSchema.getNifiPropertiesOverrides().entrySet()).orElse(Set.of()).stream()

Review Comment:
   Good catch, the intention here was to defend against NifiPropertiesOverrides being null. Updated the according to that



##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -186,6 +202,9 @@ public long getKeepAliveDuration() {
      */
     public static class Builder {
 
+        private static final String HTTP_HEADERS_SEPARATOR = ",";

Review Comment:
   I changed HTTP_HEADERS_SEPARATOR to `#`, did a quick check and it seems no header is using `#`



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -74,12 +71,14 @@ public class StartRunner implements CommandRunner {
     private volatile ShutdownHook shutdownHook;
     private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
     private final ConfigurationChangeListener configurationChangeListener;
+    private final MiNiFiPropertiesGenerator miNiFiPropertiesGenerator;
 
     private int listenPort;
 
     public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
-        PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
-        RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ConfigurationChangeListener configurationChangeListener) {
+                       PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters,
+                       File bootstrapConfigFile,

Review Comment:
   Agree, fixed



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java:
##########
@@ -29,59 +27,31 @@
 
 public abstract class WholeConfigDifferentiator {
 
-
-    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
-
     public static final String WHOLE_CONFIG_KEY = "Whole Config";
 
-    volatile ConfigurationFileHolder configurationFileHolder;
-
-    boolean compareInputStreamToConfigFile(InputStream inputStream) throws IOException {
-        logger.debug("Checking if change is different");
-        AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
-        ByteBuffer currentConfigFile = currentConfigFileReference.get();
-        ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
-        DataInputStream dataInputStream = new DataInputStream(inputStream);
-        try {
-            dataInputStream.readFully(byteBuffer.array());
-        } catch (EOFException e) {
-            logger.debug("New config is shorter than the current. Must be different.");
-            return true;
-        }
-        logger.debug("Read the input");
+    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
 
-        if (dataInputStream.available() != 0) {
-            return true;
-        } else {
-            return byteBuffer.compareTo(currentConfigFile) != 0;
-        }
-    }
+    protected volatile ConfigurationFileHolder configurationFileHolder;
 
     public void initialize(ConfigurationFileHolder configurationFileHolder) {
         this.configurationFileHolder = configurationFileHolder;
     }
 
-
-    public static class InputStreamInput extends WholeConfigDifferentiator implements Differentiator<InputStream> {
-        public boolean isNew(InputStream inputStream) throws IOException {
-            return compareInputStreamToConfigFile(inputStream);
-        }
-    }
-
-    public static class ByteBufferInput extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
-        public boolean isNew(ByteBuffer inputBuffer) {
-            AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
-            ByteBuffer currentConfigFile = currentConfigFileReference.get();
-            return inputBuffer.compareTo(currentConfigFile) != 0;
+    public static class ByteBufferInputDifferentiator extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+        public boolean isNew(ByteBuffer newFlowConfig) {
+            AtomicReference<ByteBuffer> currentFlowConfigReference = configurationFileHolder.getConfigFileReference();
+            ByteBuffer currentFlowConfig = currentFlowConfigReference.get();
+            logger.debug("Comparing byte buffers:\n newFlow={}\n existingFlow={}", newFlowConfig, currentFlowConfig);

Review Comment:
   You are right, I was using it for debugging, not much added value anymore, will remove it



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
  */
 public class FileChangeIngestor implements Runnable, ChangeIngestor {
 
-    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
-    static {
-        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
-        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
 
-        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
-    }
+    static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
 
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
 
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+    private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+    private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
 
-    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
-    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
 
-    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
-    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+    private ScheduledExecutorService executorService;
 
     private Path configFilePath;
     private WatchService watchService;
     private long pollingSeconds;
-    private volatile Differentiator<ByteBuffer> differentiator;
-    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
-    private volatile ConfigurationFileHolder configurationFileHolder;
-    private volatile Properties properties;
-    private ScheduledExecutorService executorService;
 
-    protected static WatchService initializeWatcher(Path filePath) {
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        Path configFile = ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+            .filter(not(String::isBlank))
+            .map(Path::of)
+            .map(Path::toAbsolutePath)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
         try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+            this.configurationChangeNotifier = configurationChangeNotifier;
+            this.configFilePath = configFile;
+            this.pollingSeconds = ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+                .map(Long::parseLong)
+                .filter(duration -> duration > 0)
+                .map(duration -> SECONDS.convert(duration, DEFAULT_POLLING_PERIOD_UNIT))
+                .orElseThrow(() -> new IllegalArgumentException("Cannot specify a polling period with duration <=0"));
+            this.watchService = initializeWatcher(configFile);
+            this.differentiator = ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+                .filter(not(String::isBlank))
+                .map(differentiator -> ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+                    .map(Supplier::get)
+                    .orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+                .orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+            this.differentiator.initialize(configurationFileHolder);
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier", e);
+        }
 
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        if (Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile)) {
+            throw new IllegalStateException("File ingestor config file (" + CONFIG_FILE_PATH_KEY
+                + ") must point to a different file than MiNiFi flow config file (" + MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey() + ")");
         }
     }
 
-    protected boolean targetChanged() {
-        boolean targetChanged;
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, runnable -> {
+            Thread notifierThread = Executors.defaultThreadFactory().newThread(runnable);
+            notifierThread.setName("File Change Notifier Thread");
+            notifierThread.setDaemon(true);
+            return notifierThread;
+        });
+        executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change in {}", configFilePath);
+        if (targetFileChanged()) {
+            logger.debug("Target file changed, checking if it's different than current flow");
+            try (FileInputStream flowCandidateInputStream = new FileInputStream(configFilePath.toFile())) {
+                ByteBuffer newFlowConfig = wrap(toByteArray(flowCandidateInputStream));
+                if (differentiator.isNew(newFlowConfig)) {
+                    logger.debug("Current flow and new flow is different, notifying listener");
+                    configurationChangeNotifier.notifyListeners(newFlowConfig);
+                    logger.debug("Listeners have been notified");
+                }
+            } catch (Exception e) {
+                logger.error("Could not successfully notify listeners.", e);
+            }
+        } else {
+            logger.debug("No change detected in {}", configFilePath);
+        }
+    }
 
-        Optional<WatchKey> watchKey = Optional.ofNullable(watchService.poll());
+    @Override
+    public void close() {
+        if (executorService != null) {
+            executorService.shutdownNow();
+        }
+    }
 
-        targetChanged = watchKey
+    boolean targetFileChanged() {
+        logger.debug("Attempting to acquire watch key");

Review Comment:
   Changed debug level to trace



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException, InterruptedException {
 
         Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
         String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-        initConfigFiles(bootstrapProperties, confDir);
 
-        Process process = startMiNiFi();
+        DEFAULT_LOGGER.debug("Generating minifi.properties from bootstrap.conf");

Review Comment:
   Done



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
     public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
     public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
     public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security";
+    public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+    private static final Logger logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+        WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator
+    );
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+    private static final String DOUBLE_QUOTES = "\"";
+    private static final String ETAG_HEADER = "ETag";
+    private static final String PROXY_AUTHORIZATION_HEADER = "Proxy-Authorization";
+    private static final String DEFAULT_PATH = "/";
+    private static final int BAD_REQUEST_STATUS_CODE = 400;
+    private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+    private static final String HTTP_HEADERS_SEPARATOR = ",";
+    private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
 
     private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
     private final AtomicReference<Integer> portReference = new AtomicReference<>();
     private final AtomicReference<String> hostReference = new AtomicReference<>();
     private final AtomicReference<String> pathReference = new AtomicReference<>();
     private final AtomicReference<String> queryReference = new AtomicReference<>();
+    private final AtomicReference<Map<String, String>> httpHeadersReference = new AtomicReference<>();
+
     private volatile Differentiator<ByteBuffer> differentiator;
     private volatile String connectionScheme;
     private volatile String lastEtag = "";
     private volatile boolean useEtag = false;
 
-    public PullHttpChangeIngestor() {
-        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
-    }
-
     @Override
     public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
         super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
 
-        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD_MILLISECONDS)));
         if (pollingPeriodMS.get() < 1) {
-            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
-        }
-
-        final String host = properties.getProperty(HOST_KEY);
-        if (host == null || host.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
-        }
-
-        final String path = properties.getProperty(PATH_KEY, "/");
-        final String query = properties.getProperty(QUERY_KEY, "");
-
-        final String portString = (String) properties.get(PORT_KEY);
-        final Integer port;
-        if (portString == null) {
-            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
-        } else {
-            port = Integer.parseInt(portString);
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer");
         }
 
-        portReference.set(port);
+        String host = ofNullable(properties.getProperty(HOST_KEY))
+            .filter(StringUtils::isNotBlank)
+            .orElseThrow(() -> new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+        String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+        String query = properties.getProperty(QUERY_KEY, EMPTY);
+        Map<String, String> httpHeaders = ofNullable(properties.getProperty(HTTP_HEADERS))
+            .filter(StringUtils::isNotBlank)
+            .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+            .map(Arrays::stream)
+            .orElseGet(Stream::of)
+            .map(String::trim)
+            .map(header -> header.split(HTTP_HEADER_KEY_VALUE_SEPARATOR))
+            .filter(split -> split.length == 2)
+            .collect(toMap(split -> ofNullable(split[0]).map(String::trim).orElse(EMPTY), split -> ofNullable(split[1]).map(String::trim).orElse(EMPTY)));
+        logger.debug("Configured HTTP headers: {}", httpHeaders);
+
+        ofNullable(properties.get(PORT_KEY))
+            .map(rawPort -> (String) rawPort)

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1265295102


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -150,17 +107,11 @@ java.arg.14=-Djava.awt.headless=true
 #c2.rest.connectionTimeout=5 sec
 #c2.rest.readTimeout=5 sec
 #c2.rest.callTimeout=10 sec
-## heartbeat in milliseconds
-#c2.agent.heartbeat.period=5000
-## define parameters about your agent
-#c2.agent.class=
+# Comma separated list of HTTP headers, eg: Accept:text/json
+#c2.rest.http.headers=text/json

Review Comment:
   Is there a reason this has `text/json` instead of the standard `application/json`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org