You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "bejancsaba (via GitHub)" <gi...@apache.org> on 2023/06/24 18:29:34 UTC

[GitHub] [nifi] bejancsaba commented on a diff in pull request #7344: NIFI-11514 MiNiFi Flow JSON support and deprecating YAML format. Migration tool from YAML to JSON

bejancsaba commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1235433806


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java:
##########
@@ -80,7 +83,7 @@ public Map<String, Object> getProperties() {
 
     @Override
     public boolean requiresRestart() {
-        return true;
+        return false;

Review Comment:
   nice :)



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGenerator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.String.join;
+import static java.lang.System.getProperty;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toList;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider.getBootstrapConfFile;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_APP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_BOOTSTRAP_LOG_FILE_NAME;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_DIR;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_FILE_EXTENSION;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.LOG_DIR;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.getMiNiFiPropertiesPath;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.file.Path;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.util.OrderedProperties;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+
+public class MiNiFiPropertiesGenerator {
+
+    public static final String PROPERTIES_FILE_APACHE_2_0_LICENSE =
+        " Licensed to the Apache Software Foundation (ASF) under one or more\n" +
+            "# contributor license agreements.  See the NOTICE file distributed with\n" +
+            "# this work for additional information regarding copyright ownership.\n" +
+            "# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
+            "# (the \"License\"); you may not use this file except in compliance with\n" +
+            "# the License.  You may obtain a copy of the License at\n" +
+            "#\n" +
+            "#     http://www.apache.org/licenses/LICENSE-2.0\n" +
+            "#\n" +
+            "# Unless required by applicable law or agreed to in writing, software\n" +
+            "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
+            "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
+            "# See the License for the specific language governing permissions and\n" +
+            "# limitations under the License.\n" +
+            "\n";
+
+    static final List<Triple<String, String, String>> NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS = List.of(

Review Comment:
   Thanks for collecting all of these it will be really useful to have everything in place in the future.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()
+            .map(RemoteProcessGroup::stopTransmitting)
+            .forEach(future -> {
+                try {
+                    future.get(5000, TimeUnit.MICROSECONDS);
+                } catch (Exception e) {
+                    LOGGER.warn("Unable to stop remote process group", e);
+                }
+            });
+        rootProcessGroup.stopProcessing();
+    }
+
+    private void backup(Path current, Path backup) throws IOException {

Review Comment:
   There is quite some overlap in logic between this class and MiNiFiConfigurationChangeListener. What do you think about extracting the logic to the commons module. I see there is a commons-api maybe a commons-service would make sense? Or there are dependencies that doesn't allow such extraction?



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {

Review Comment:
   What do you think about adding usage information to the readme as well?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,
+            "-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName,
+            "-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension,
+            "-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName,
+            "-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension,
+            "org.apache.nifi.minifi.MiNiFi");
+    }
+
+    private File getFile(String filename, File workingDir) {
+        File file = new File(filename);
+        return file.isAbsolute() ? file : new File(workingDir, filename).getAbsoluteFile();
     }
 
-    private String getJavaCommand(Properties props) {
-        String javaCmd = props.getProperty("java");
-        if (javaCmd == null) {
-            javaCmd = DEFAULT_JAVA_CMD;
-        }
-        if (javaCmd.equals(DEFAULT_JAVA_CMD)) {
-            Optional.ofNullable(System.getenv("JAVA_HOME"))
-                .map(javaHome -> getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
-                    .orElseGet(() -> getJavaCommandBasedOnExtension(javaHome, "").orElse(DEFAULT_JAVA_CMD)));
-        }
-        return javaCmd;
+    private String getJavaCommand(Properties bootstrapProperties) {
+        String javaCommand = bootstrapProperties.getProperty(JAVA_COMMAND_KEY, DEFAULT_JAVA_CMD);
+        return javaCommand.equals(DEFAULT_JAVA_CMD)
+            ? ofNullable(System.getenv(JAVA_HOME_ENVIRONMENT_VARIABLE))
+            .flatMap(javaHome ->
+                getJavaCommandBasedOnExtension(javaHome, WINDOWS_FILE_EXTENSION)
+                    .or(() -> getJavaCommandBasedOnExtension(javaHome, LINUX_FILE_EXTENSION)))
+            .orElse(DEFAULT_JAVA_CMD)
+            : javaCommand;
     }
 
     private Optional<String> getJavaCommandBasedOnExtension(String javaHome, String extension) {
-        String javaCmd = null;
-        File javaFile = new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension);
-        if (javaFile.exists() && javaFile.canExecute()) {
-            javaCmd = javaFile.getAbsolutePath();
-        }
-        return Optional.ofNullable(javaCmd);
+        return Optional.of(new File(javaHome + File.separatorChar + "bin" + File.separatorChar + "java" + extension))

Review Comment:
   Very minor I know it was just refactored but would this be the JAVA_COMMAND_KEY?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = "nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", listenerHandleResult, ex);

Review Comment:
   Shouldn't this be error or warning?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties) {
         logger.info("HTTPS Connector added for Host [{}] and Port [{}]", https.getHost(), https.getPort());
     }
 
-    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+    private Supplier<IllegalArgumentException> unableToFindDifferentiatorExceptionSupplier(String differentiator) {

Review Comment:
   I saw this across all ChangeIngestors, would it worth extracting the shared logic to an abstract class? Maybe if there is other common logic to be extracted I'm not sure, will leave it up to you.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider bootstrapFileProvider) {
      * @throws IOException throws IOException if any of the configuration file read fails
      */
     public List<String> getMiNiFiExecCommand(int listenPort, File workingDir) throws IOException {
-        Properties props = bootstrapFileProvider.getBootstrapProperties();
-        File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
-        File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+        File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
+        File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY, DEFAULT_LIB_DIR).trim(), workingDir);
+
         String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
         String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
         String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
         String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
         String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
 
-        List<String> cmd = new ArrayList<>();
-        cmd.add(getJavaCommand(props));
-        cmd.add("-classpath");
-        cmd.add(buildClassPath(props, confDir, libDir));
-        cmd.addAll(getJavaAdditionalArgs(props));
-        cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
-        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
-        cmd.add("-Dapp=MiNiFi");
-        cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
-        cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
-        cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
-        cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
-        cmd.add("org.apache.nifi.minifi.MiNiFi");
-
-        return cmd;
+        return List.of(
+            getJavaCommand(bootstrapProperties),
+            "-classpath",
+            buildClassPath(confDir, libDir),
+            getJavaAdditionalArgs(bootstrapProperties),
+            "-Dnifi.properties.file.path=" + getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+            "-Dnifi.bootstrap.listen.port=" + listenPort,
+            "-Dapp=MiNiFi",
+            "-D" + LOG_DIR + "=" + minifiLogDir,

Review Comment:
   again it was just a refactor but readability could be improved with a pattern what do you think? I mean for the "-D" + key + "=" + value pattern. 



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(

Review Comment:
   Extremely minor but you could use Stream.of instead of List.of().stream



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {

Review Comment:
   Thanks for the thorough test coverage.



##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+    @TempDir
+    private Path tempDir;
+
+    private Path configDirectory;
+    private Path bootstrapPropertiesFile;
+    private Path minifiPropertiesFile;
+
+    private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        configDirectory = tempDir.toAbsolutePath().resolve("conf");
+        Files.createDirectories(configDirectory);
+        bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+        minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+        testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+    }
+
+    @Test
+    public void testGenerateDefaultNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        List<String> expectedMiNiFiProperties = NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+            .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+            .collect(toList());
+        List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+            .stream()
+            .map(entry -> entry.getKey() + "=" + entry.getValue())
+            .collect(toList());
+        assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+    }
+
+    @Test
+    public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws ConfigurationChangeException {
+        // given
+        Properties bootstrapProperties = createBootstrapProperties(List.of(
+                MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE.getKey(),
+                MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD.getKey())
+            .stream()
+            .collect(toMap(Function.identity(), __ -> randomUUID().toString()))
+        );
+
+        // when
+        testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(), bootstrapProperties);
+
+        // then
+        Properties miNiFiProperties = loadMiNiFiProperties();
+        MINIFI_TO_NIFI_PROPERTY_MAPPING.entrySet().stream()
+            .allMatch(entry -> Objects.equals(bootstrapProperties.getProperty(entry.getKey()), miNiFiProperties.getProperty(entry.getValue())));

Review Comment:
   My guess is that you would need assertion here as well right?



##########
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/files/raspi3/config.test.json.v1:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+    "encodingVersion": {
+        "majorVersion": 2,
+        "minorVersion": 0
+    },
+    "maxTimerDrivenThreadCount": 10,
+    "maxEventDrivenThreadCount": 1,
+    "registries": [],
+    "parameterContexts": [],
+    "parameterProviders": [],
+    "controllerServices": [],
+    "reportingTasks": [],
+    "templates": [],
+    "rootGroup": {
+        "identifier": "c1b4e586-2011-3f81-a11e-8d669f084d1c",
+        "instanceIdentifier": "29db3dbc-0188-1000-7025-4cab8b52d278",
+        "name": "NiFi Flow",

Review Comment:
   I suppose it doesn't really matter but can this be MiNiFi Flow?



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {
+
+    static final String COMMON_SSL_CONTEXT_SERVICE_NAME = "SSL-Context-Service";
+    static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context Service";
+    static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME = "Site-To-Site-Provenance-Reporting";
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlowEnrichService.class);
+
+    private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi";
+    private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE = "org.apache.nifi.ssl.StandardRestrictedSSLContextService";
+    private static final String RESTRICTED_SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.RestrictedSSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_API = "org.apache.nifi.ssl.SSLContextService";
+    private static final String SSL_CONTEXT_SERVICE_NAR = "nifi-ssl-context-service-nar";
+    private static final String STANDARD_SERVICES_API_NAR_ARTIFACT = "nifi-standard-services-api-nar";
+    private static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
+    private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar";
+    private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
+    private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream";
+
+    private final ReadableProperties minifiProperties;
+
+    public FlowEnrichService(ReadableProperties minifiProperties) {
+        this.minifiProperties = minifiProperties;
+    }
+
+    public byte[] enrichFlow(byte[] flowCandidate) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Enriching flow with content: \n{}", new String(flowCandidate, UTF_8));
+        }
+
+        VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate);
+
+        Optional<Integer> maxConcurrentThreads = ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey()))
+            .map(Integer::parseInt);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
+        maxConcurrentThreads.ifPresent(versionedDataflow::setMaxEventDrivenThreadCount);
+
+        VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
+        if (rootGroup.getIdentifier() == null) {
+            rootGroup.setIdentifier(UUID.randomUUID().toString());
+        }
+        if (rootGroup.getInstanceIdentifier() == null) {
+            rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+        }
+
+        Optional<VersionedControllerService> commonSslControllerService = createCommonSslControllerService();
+        commonSslControllerService
+            .ifPresent(sslControllerService -> {
+                List<VersionedControllerService> currentControllerServices = ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new);
+                currentControllerServices.add(sslControllerService);
+                versionedDataflow.setControllerServices(currentControllerServices);
+            });
+
+        commonSslControllerService
+            .filter(__ -> parseBoolean(minifiProperties.getProperty(NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey())))
+            .map(VersionedComponent::getInstanceIdentifier)
+            .ifPresent(commonSslControllerServiceInstanceId -> overrideProcessorsSslControllerService(rootGroup, commonSslControllerServiceInstanceId));
+
+        createProvenanceReportingTask(commonSslControllerService)

Review Comment:
   What do you think instead of passing around the Optional as an argument you would "resolve" it here so only the instance identifier is passed not the whole controllerService optional
   ```
   sslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY)
   ```



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+    private final FlowController flowController;
+    private final FlowService flowService;
+    private final FlowEnrichService flowEnrichService;
+    private final Path flowConfigurationFile;
+    private final Path backupFlowConfigurationFile;
+    private final Path rawFlowConfigurationFile;
+    private final Path backupRawFlowConfigurationFile;
+
+    public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) {
+        this.flowController = flowController;
+        this.flowService = flowService;
+        this.flowEnrichService = flowEnrichService;
+        Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath();
+        this.flowConfigurationFile = flowConfigurationFilePath;
+        this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+        this.rawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        this.backupRawFlowConfigurationFile = flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+    }
+
+    @Override
+    public boolean update(byte[] rawFlow) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
+        }
+        try {
+            byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow);
+            stopRootProcessGroup();
+            backup(flowConfigurationFile, backupFlowConfigurationFile);
+            backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+            persist(enrichedFlowCandidate, flowConfigurationFile, true);
+            reloadFlow();
+            startRootProcessGroup();
+            persist(rawFlow, rawFlowConfigurationFile, false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Configuration update failed. Reverting to previous flow", e);
+            revert(backupFlowConfigurationFile, flowConfigurationFile);
+            revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+            return false;
+        } finally {
+            removeIfExists(backupFlowConfigurationFile);
+            removeIfExists(backupRawFlowConfigurationFile);
+        }
+    }
+
+    private void stopRootProcessGroup() {
+        ProcessGroup rootProcessGroup = flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+        rootProcessGroup.findAllRemoteProcessGroups()
+            .stream()

Review Comment:
   What do you think about doing this in parallel? If I'm not mistaken future.get is blocking so this will stop remote process groups one after another with 5sec timeout.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);

Review Comment:
   Very minor but these could be final right?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;
+
+    @BeforeEach
+    public void setup() {
+        flowConfigurationFile = Path.of(tempDir.getAbsolutePath(), FLOW_CONFIG_FILE_NAME).toAbsolutePath();
+        backupFlowConfigurationFile = Path.of(flowConfigurationFile + BACKUP_EXTENSION);
+        String flowConfigurationFileBaseName = FilenameUtils.getBaseName(flowConfigurationFile.toString());
+        rawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION);
+        backupRawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION);
+
+        testUpdateConfiguratinStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, flowConfigurationFile.toString());
+
+        writeGzipFile(flowConfigurationFile, ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
+        writePlainTextFile(rawFlowConfigurationFile, ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
+    }
+
+    @Test
+    public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertTrue(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(1)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(1)).startProcessing();
+    }
+
+    @Test
+    public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IOException {
+        // given
+        when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+        when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+        when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+        when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+        doThrow(new IOException()).when(mockFlowService).load(null);
+
+        // when
+        boolean result = testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+        //then
+        assertFalse(result);
+        assertTrue(exists(flowConfigurationFile));
+        assertTrue(exists(rawFlowConfigurationFile));
+        assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
+        assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
+        assertFalse(exists(backupFlowConfigurationFile));
+        assertFalse(exists(backupRawFlowConfigurationFile));
+        verify(mockProcessGroup, times(1)).stopProcessing();
+        verify(mockFlowService, times(1)).load(null);
+        verify(mockFlowController, times(0)).onFlowInitialized(true);
+        verify(mockProcessGroup, times(0)).startProcessing();
+    }
+
+    private void writeGzipFile(Path path, byte[] content) {
+        try (ByteArrayInputStream inputStream = new ByteArrayInputStream(content);
+             OutputStream outputStream = new GZIPOutputStream(newOutputStream(path, WRITE, CREATE, TRUNCATE_EXISTING))) {
+            inputStream.transferTo(outputStream);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private byte[] readGzipFile(Path path) {
+        try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(path));
+             ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+            inputStream.transferTo(outputStream);
+            outputStream.flush();
+            return outputStream.toByteArray();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void writePlainTextFile(Path path, byte[] content) {

Review Comment:
   As these are "just" tests you could add the exception to the method signature making it much shorter. This is minor enough so will leave it to you.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/flow.json.raw:
##########
@@ -0,0 +1,38 @@
+{
+  "encodingVersion": {
+    "majorVersion": 2,
+    "minorVersion": 0
+  },
+  "maxTimerDrivenThreadCount": 1,
+  "maxEventDrivenThreadCount": 1,
+  "registries": [],
+  "parameterContexts": [],
+  "parameterProviders": [],
+  "controllerServices": [],
+  "reportingTasks": [],
+  "templates": [],
+  "rootGroup": {
+    "name": "NiFi Flow",

Review Comment:
   I know I commented earlier and there are a ton of references to this json maybe it doesn't worth rewriting it everywhere but what do you think about renaming it here to MiNiFi Flow?



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {

Review Comment:
   As I see this is the heart and soul of the transformation logic, thanks for keeping it clean and understandable as much as possible even with the given constraints.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ComponentPropertyProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.util.Objects.nonNull;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithId;
+
+public class ComponentPropertyProvider {

Review Comment:
   Thanks for the clear code here however given the complexity could you please add a few sentences in javadoc to describe the intention here?



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+    public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+    private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+    private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8);
+    private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8);
+    private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8);
+    private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8);
+
+    @TempDir
+    private File tempDir;
+
+    @Mock
+    private FlowController mockFlowController;
+    @Mock
+    private FlowService mockFlowService;
+    @Mock
+    private FlowEnrichService mockFlowEnrichService;
+    @Mock
+    private FlowManager mockFlowManager;
+    @Mock
+    private ProcessGroup mockProcessGroup;
+
+    private Path flowConfigurationFile;
+    private Path backupFlowConfigurationFile;
+    private Path rawFlowConfigurationFile;
+    private Path backupRawFlowConfigurationFile;
+
+    private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;

Review Comment:
   There is a missing "o" here "testUpdateConfiguratinStrategy"



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);

Review Comment:
   Does this mean that the position information is not available in the source? I was thinking about this I suppose we usually don't expect MiNiFi flows to be visualised so it doesn't make much difference. Was thinking about adding an incrementing constant to both x and y for each element which still wouldn't be nice but would show the volume of processors. Will leave it to you possibly doesn't worth the effort.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {
+
+    public static final String TRANSFORM_YML = "transform-yml";
+
+    private static final String COMMAND_DESCRIPTION = "Transform MiNiFi config YAML into NiFi flow JSON format";
+    private static final String PROPERTY_KEY_VALUE_DELIMITER = "=";
+
+    private final PathInputStreamFactory pathInputStreamFactory;
+    private final PathOutputStreamFactory pathOutputStreamFactory;
+
+    public TransformYamlCommandFactory(PathInputStreamFactory pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) {
+        this.pathInputStreamFactory = pathInputStreamFactory;
+        this.pathOutputStreamFactory = pathOutputStreamFactory;
+    }
+
+    public ConfigMain.Command create() {
+        return new ConfigMain.Command(this::transformYamlToJson, COMMAND_DESCRIPTION);
+    }
+
+    private int transformYamlToJson(String[] args) {
+        if (args.length != 5) {
+            printTransformYmlUsage();
+            return ConfigMain.ERR_INVALID_ARGS;
+        }
+
+        String sourceMiNiFiConfigPath = args[1];

Review Comment:
   Just a minor thank what about calling this sourceMiNiFiConfigYamlPath? It could help with readability.



##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+    private static final String RPG_URLS_DELIMITER = ",";
+    private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+    private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1 GB";
+    private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+    private static final String FLOW_FILE_OUTBOUND_POLICY = "STREAM_WHEN_AVAILABLE";
+    private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+    private static final Position DEFAULT_POSITION = new Position(0, 0);
+
+    private final ConfigSchema configSchema;
+    private final ComponentPropertyProvider componentPropertyProvider;
+
+    public ConfigSchemaToVersionedDataFlowTransformer(ConfigSchema configSchema) {
+        this.configSchema = configSchema;
+        this.componentPropertyProvider = new ComponentPropertyProvider(configSchema);
+    }
+
+    public Map<String, String> extractProperties() {
+        CorePropertiesSchema coreProperties = configSchema.getCoreProperties();
+        FlowFileRepositorySchema flowFileRepositoryProperties = configSchema.getFlowfileRepositoryProperties();
+        ContentRepositorySchema contentRepositoryProperties = configSchema.getContentRepositoryProperties();
+        ProvenanceRepositorySchema provenanceRepositoryProperties = configSchema.getProvenanceRepositorySchema();
+        ComponentStatusRepositorySchema componentStatusRepositoryProperties = configSchema.getComponentStatusRepositoryProperties();
+        SwapSchema swapProperties = configSchema.getFlowfileRepositoryProperties().getSwapProperties();
+
+        return Stream.concat(
+                Stream.of(
+                    entry(NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey(), coreProperties.getMaxConcurrentThreads().toString()),
+                    entry(FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD, coreProperties.getFlowControllerGracefulShutdownPeriod()),
+                    entry(WRITE_DELAY_INTERVAL, coreProperties.getFlowServiceWriteDelayInterval()),
+                    entry(ADMINISTRATIVE_YIELD_DURATION, coreProperties.getAdministrativeYieldDuration()),
+                    entry(BORED_YIELD_DURATION, coreProperties.getBoredYieldDuration()),
+                    entry(VARIABLE_REGISTRY_PROPERTIES, coreProperties.getVariableRegistryProperties()),
+                    entry(FLOWFILE_REPOSITORY_IMPLEMENTATION, flowFileRepositoryProperties.getFlowFileRepository()),
+                    entry(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, flowFileRepositoryProperties.getCheckpointInterval()),
+                    entry(FLOWFILE_REPOSITORY_ALWAYS_SYNC, Boolean.toString(flowFileRepositoryProperties.getAlwaysSync())),
+                    entry(CONTENT_REPOSITORY_IMPLEMENTATION, contentRepositoryProperties.getContentRepository()),
+                    entry(MAX_APPENDABLE_CLAIM_SIZE, contentRepositoryProperties.getContentClaimMaxAppendableSize()),
+                    entry(CONTENT_ARCHIVE_MAX_RETENTION_PERIOD, contentRepositoryProperties.getContentRepoArchiveMaxRetentionPeriod()),
+                    entry(CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE, contentRepositoryProperties.getContentRepoArchiveMaxUsagePercentage()),
+                    entry(CONTENT_ARCHIVE_ENABLED, Boolean.toString(contentRepositoryProperties.getContentRepoArchiveEnabled())),
+                    entry(PROVENANCE_REPO_IMPLEMENTATION_CLASS, provenanceRepositoryProperties.getProvenanceRepository()),
+                    entry(PROVENANCE_ROLLOVER_TIME, provenanceRepositoryProperties.getProvenanceRepoRolloverTimeKey()),
+                    entry(PROVENANCE_INDEX_SHARD_SIZE, provenanceRepositoryProperties.getProvenanceRepoIndexShardSize()),
+                    entry(PROVENANCE_MAX_STORAGE_SIZE, provenanceRepositoryProperties.getProvenanceRepoMaxStorageSize()),
+                    entry(PROVENANCE_MAX_STORAGE_TIME, provenanceRepositoryProperties.getProvenanceRepoMaxStorageTime()),
+                    entry(COMPONENT_STATUS_SNAPSHOT_FREQUENCY, componentStatusRepositoryProperties.getSnapshotFrequency()),
+                    entry(QUEUE_SWAP_THRESHOLD, swapProperties.getThreshold().toString())
+                ),
+                ofNullable(configSchema.getNifiPropertiesOverrides().entrySet()).orElse(Set.of()).stream()

Review Comment:
   entrySet of a map can't be null so I think there is no need for the nullable.



##########
nifi-commons/nifi-kubernetes-client/src/test/java/org/apache/nifi/kubernetes/client/StandardKubernetesClientProviderTest.java:
##########
@@ -31,7 +31,7 @@ void setProvider() {
         provider = new StandardKubernetesClientProvider();
     }
 
-    @Timeout(5)
+    @Timeout(10)

Review Comment:
   It was unstable with 5? :)



##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {

Review Comment:
   As this is an actual service what do you think moving it out from the commons-api? I see there is no commons-service module maybe it would worth creating one. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org