You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/17 00:21:47 UTC

[nifi-minifi-cpp] 01/04: MINIFICPP-2035 NiFi flow json format support

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ce288b920d2ac6dd1a445524b81aaf88baa501a9
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:09:38 2023 +0100

    MINIFICPP-2035 NiFi flow json format support
    
    Closes #1494
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../integration/MiNiFi_integration_test_driver.py  |   3 +
 docker/test/integration/cluster/ContainerStore.py  |   3 +
 .../test/integration/cluster/DockerTestCluster.py  |   3 +
 .../cluster/containers/MinifiContainer.py          |   9 +-
 docker/test/integration/features/s2s.feature       |   7 +-
 docker/test/integration/features/script.feature    |   3 +-
 docker/test/integration/features/sql.feature       |   3 +-
 .../Minifi_flow_json_serializer.py                 | 141 +++++++++++++
 docker/test/integration/steps/steps.py             |   5 +
 extensions/http-curl/protocols/RESTSender.cpp      |   1 +
 .../tests/unit/AdaptiveConfigurationTests.cpp      |  66 ++++++
 .../tests/unit/FlowJsonTests.cpp                   | 163 +++++++++++++++
 .../tests/unit/YamlConfigurationTests.cpp          | 153 +++-----------
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/RemoteProcessorGroupPort.h       |   4 +
 libminifi/include/core/ProcessGroup.h              |   2 -
 libminifi/include/core/ProcessorConfig.h           |   6 +-
 .../AdaptiveConfiguration.h}                       |  31 +--
 libminifi/include/core/flow/CheckRequiredField.h   |   6 +-
 libminifi/include/core/flow/FlowSchema.h           |  88 ++++++++
 libminifi/include/core/flow/Node.h                 |  59 +++++-
 .../include/core/flow/StructuredConfiguration.h    |  28 ++-
 .../include/core/flow/StructuredConnectionParser.h |  10 +-
 libminifi/include/core/json/JsonNode.h             |  19 ++
 .../include/core/state/nodes/SchedulingNodes.h     |   4 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   9 -
 libminifi/include/core/yaml/YamlNode.h             |  16 +-
 libminifi/src/core/ConfigurationFactory.cpp        |  14 +-
 libminifi/src/core/ProcessGroup.cpp                |   6 +-
 libminifi/src/core/flow/AdaptiveConfiguration.cpp  |  67 ++++++
 libminifi/src/core/flow/CheckRequiredField.cpp     |  18 +-
 libminifi/src/core/flow/FlowSchema.cpp             | 144 +++++++++++++
 libminifi/src/core/flow/Node.cpp                   |  10 +-
 .../src/core/flow/StructuredConfiguration.cpp      | 227 +++++++++++----------
 .../src/core/flow/StructuredConnectionParser.cpp   |  29 ++-
 libminifi/src/core/json/JsonConfiguration.cpp      |  89 --------
 libminifi/src/core/yaml/YamlConfiguration.cpp      |  24 +--
 libminifi/test/ConfigurationTestController.h       |  56 +++++
 minifi_main/MiNiFiMain.cpp                         |   2 +-
 39 files changed, 1076 insertions(+), 454 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index e3153dffe..21dec70b4 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -315,3 +315,6 @@ class MiNiFi_integration_test:
 
     def enable_sql_in_minifi(self):
         self.cluster.enable_sql_in_minifi()
+
+    def set_yaml_in_minifi(self):
+        self.cluster.set_yaml_in_minifi()
diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py
index 867e41076..fb5f9c6b8 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -160,6 +160,9 @@ class ContainerStore:
     def enable_sql_in_minifi(self):
         self.minifi_options.enable_sql = True
 
+    def set_yaml_in_minifi(self):
+        self.minifi_options.config_format = "yaml"
+
     def get_startup_finished_log_entry(self, container_name):
         return self.containers[container_name].get_startup_finished_log_entry()
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
index 9688c732e..0a07119a5 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -85,6 +85,9 @@ class DockerTestCluster:
     def enable_sql_in_minifi(self):
         self.container_store.enable_sql_in_minifi()
 
+    def set_yaml_in_minifi(self):
+        self.container_store.set_yaml_in_minifi()
+
     def get_app_log(self, container_name):
         log_source = self.container_store.log_source(container_name)
         if log_source == LogSource.FROM_DOCKER_CONTAINER:
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
index 93d65ddda..7f70d4fb7 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -21,6 +21,7 @@ import shutil
 import copy
 from .FlowContainer import FlowContainer
 from minifi.flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
+from minifi.flow_serialization.Minifi_flow_json_serializer import Minifi_flow_json_serializer
 
 
 class MinifiOptions:
@@ -30,6 +31,7 @@ class MinifiOptions:
         self.enable_provenance = False
         self.enable_prometheus = False
         self.enable_sql = False
+        self.config_format = "json"
 
 
 class MinifiContainer(FlowContainer):
@@ -58,7 +60,12 @@ class MinifiContainer(FlowContainer):
         return "Starting Flow Controller"
 
     def _create_config(self):
-        serializer = Minifi_flow_yaml_serializer()
+        if self.options.config_format == "yaml":
+            serializer = Minifi_flow_yaml_serializer()
+        elif self.options.config_format == "json":
+            serializer = Minifi_flow_json_serializer()
+        else:
+            assert False, "Invalid flow configuration format: {}".format(self.options.config_format)
         test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
         with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file:
diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature
index a836a50c5..a51589a02 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -35,7 +35,8 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
     Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
 
   Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
     And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
 
@@ -47,7 +48,9 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
     Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds
 
   Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    # "drop empty" is only supported with yaml config
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
     And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
     And the connection going to the RemoteProcessGroup has "drop empty" set
diff --git a/docker/test/integration/features/script.feature b/docker/test/integration/features/script.feature
index 4a31b3c42..b8ca2e5f6 100644
--- a/docker/test/integration/features/script.feature
+++ b/docker/test/integration/features/script.feature
@@ -29,7 +29,8 @@ Feature: MiNiFi can execute Lua and Python scripts
     Then the Minifi logs contain the following message: "Sleeping forever" 3 times after 5 seconds
 
   Scenario: ExecuteScript should only allow one Python script running at a time
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And the scheduling period of the GenerateFlowFile processor is set to "500 ms"
     And a ExecuteScript processor with the "Script File" property set to "/tmp/resources/python/sleep_forever.py"
     And the "Script Engine" property of the ExecuteScript processor is set to "python"
diff --git a/docker/test/integration/features/sql.feature b/docker/test/integration/features/sql.feature
index 768ed4d7e..bafab8e95 100644
--- a/docker/test/integration/features/sql.feature
+++ b/docker/test/integration/features/sql.feature
@@ -33,7 +33,8 @@ Feature: Executing SQL operations from MiNiFi-C++
     Then the query "SELECT * FROM test_table WHERE int_col = 42" returns 1 rows in less than 120 seconds on the PostgreSQL server
 
   Scenario: A MiNiFi instance can query to test table with ExecuteSQL processor
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a UpdateAttribute processor with the "sql.args.1.value" property set to "apple"
     And the "sql.args.2.value" property of the UpdateAttribute processor is set to "banana"
     And a ExecuteSQL processor with the "SQL select query" property set to "SELECT * FROM test_table WHERE text_col = ? OR text_col = ? ORDER BY int_col DESC"
diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
new file mode 100644
index 000000000..4468a6107
--- /dev/null
+++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import uuid
+import json
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+from ..core.Funnel import Funnel
+
+
+class Minifi_flow_json_serializer:
+    def serialize(self, start_nodes, controllers):
+        res = {
+            'rootGroup': {
+                'name': 'MiNiFi Flow',
+                'processors': [],
+                'funnels': [],
+                'connections': [],
+                'remoteProcessGroups': [],
+                'controllerServices': []
+            }
+        }
+        visited = []
+
+        for node in start_nodes:
+            self.serialize_node(node, res['rootGroup'], visited)
+
+        for controller in controllers:
+            self.serialize_controller(controller, res['rootGroup'])
+
+        return json.dumps(res)
+
+    def serialize_node(self, connectable, root, visited):
+        visited.append(connectable)
+
+        if hasattr(connectable, 'name'):
+            connectable_name = connectable.name
+        else:
+            connectable_name = str(connectable.uuid)
+
+        if isinstance(connectable, InputPort):
+            group = connectable.remote_process_group
+            res_group = None
+
+            for res_group_candidate in root['remoteProcessGroups']:
+                assert isinstance(res_group_candidate, dict)
+                if res_group_candidate['identifier'] == str(group.uuid):
+                    res_group = res_group_candidate
+
+            if res_group is None:
+                res_group = {
+                    'name': group.name,
+                    'identifier': str(group.uuid),
+                    'targetUri': group.url,
+                    'communicationsTimeout': '30 sec',
+                    'yieldDuration': '3 sec',
+                    'inputPorts': []
+                }
+
+                root['remoteProcessGroups'].append(res_group)
+
+            res_group['inputPorts'].append({
+                'identifier': str(connectable.uuid),
+                'name': connectable.name
+            })
+
+        if isinstance(connectable, Processor):
+            root['processors'].append({
+                'name': connectable_name,
+                'identifier': str(connectable.uuid),
+                'type': connectable.class_prefix + connectable.clazz,
+                'schedulingStrategy': connectable.schedule['scheduling strategy'],
+                'schedulingPeriod': connectable.schedule['scheduling period'],
+                'penaltyDuration': connectable.schedule['penalization period'],
+                'yieldDuration': connectable.schedule['yield period'],
+                'runDurationMillis': connectable.schedule['run duration nanos'],
+                'properties': connectable.properties,
+                'autoTerminatedRelationships': connectable.auto_terminate,
+                'concurrentlySchedulableTaskCount': connectable.max_concurrent_tasks
+            })
+
+            for svc in connectable.controller_services:
+                if svc in visited:
+                    continue
+
+                visited.append(svc)
+                root['controllerServices'].append({
+                    'name': svc.name,
+                    'identifier': svc.id,
+                    'type': svc.service_class,
+                    'properties': svc.properties
+                })
+
+        if isinstance(connectable, Funnel):
+            root['funnels'].append({
+                'identifier': str(connectable.uuid)
+            })
+
+        for conn_name in connectable.connections:
+            conn_procs = connectable.connections[conn_name]
+
+            if not isinstance(conn_procs, list):
+                conn_procs = [conn_procs]
+
+            for proc in conn_procs:
+                root['connections'].append({
+                    'name': str(uuid.uuid4()),
+                    'source': {'id': str(connectable.uuid)},
+                    'destination': {'id': str(proc.uuid)}
+                })
+                if (all(str(connectable.uuid) != x['identifier'] for x in root['funnels'])):
+                    root['connections'][-1]['selectedRelationships'] = [conn_name]
+                if proc not in visited:
+                    self.serialize_node(proc, root, visited)
+
+    def serialize_controller(self, controller, root):
+        if hasattr(controller, 'name'):
+            connectable_name = controller.name
+        else:
+            connectable_name = str(controller.uuid)
+
+        root['controllerServices'].append({
+            'name': connectable_name,
+            'identifier': controller.id,
+            'type': controller.service_class,
+            'properties': controller.properties
+        })
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 686c33ebe..cce5eaf30 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1037,3 +1037,8 @@ def step_impl(context, size: str, duration: str) -> None:
 @then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}')
 def step_impl(context, peak_usage_percent: str, duration: str) -> None:
     context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration))
+
+
+@given(u'a MiNiFi CPP server with yaml config')
+def step_impl(context):
+    context.test.set_yaml_in_minifi()
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index b58bf3233..ad3036cbe 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -162,6 +162,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct
   if (payload.getOperation() == Operation::TRANSFER) {
     auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max());
     client.setReadCallback(std::move(read));
+    client.setRequestHeader("Accept", "application/vnd.minifi-c2+json;version=1, text/yml");
   } else {
     // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
     // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
diff --git a/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
new file mode 100644
index 000000000..7bf8ddd27
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "ConfigurationTestController.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+TEST_CASE("Adaptive configuration can parse JSON") {
+  ConfigurationTestController controller;
+
+  const char* json_config = R"(
+    {
+      "Flow Controller": {"name": "root"},
+      "Processors": [
+        {
+          "id": "00000000-0000-0000-0000-000000000001",
+          "class": "DummyProcessor",
+          "name": "Proc1"
+        }
+      ],
+      "Connections": []
+    }
+  )";
+
+  core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+  auto root = config.getRootFromPayload(json_config);
+
+  REQUIRE(root->findProcessorByName("Proc1"));
+}
+
+TEST_CASE("Adaptive configuration can parse YAML") {
+  ConfigurationTestController controller;
+
+  const char* yaml_config = R"(
+Flow Controller:
+  name: root
+Processors:
+- id: 00000000-0000-0000-0000-000000000001
+  class: DummyProcessor
+  name: Proc1
+Connections: []
+  )";
+
+  core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+  auto root = config.getRootFromPayload(yaml_config);
+
+  REQUIRE(root->findProcessorByName("Proc1"));
+}
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
new file mode 100644
index 000000000..b7d1a47f8
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <memory>
+#include <chrono>
+#include "core/repository/VolatileContentRepository.h"
+#include "core/ProcessGroup.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "utils/TestUtils.h"
+#include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
+#include "Funnel.h"
+
+using namespace std::literals::chrono_literals;
+
+TEST_CASE("NiFi flow json format is correctly parsed") {
+  ConfigurationTestController test_controller;
+
+  core::flow::AdaptiveConfiguration config(test_controller.getContext());
+
+  static const std::string CONFIG_JSON =
+      R"(
+{
+  "rootGroup": {
+    "name": "MiNiFi Flow",
+    "processors": [{
+      "identifier": "00000000-0000-0000-0000-000000000001",
+      "name": "MyGenFF",
+      "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
+      "concurrentlySchedulableTaskCount": 15,
+      "schedulingStrategy": "TIMER_DRIVEN",
+      "schedulingPeriod": "3 sec",
+      "penaltyDuration": "12 sec",
+      "yieldDuration": "4 sec",
+      "runDurationMillis": 12,
+      "autoTerminatedRelationships": ["one", "two"],
+      "properties": {
+        "File Size": "10 B",
+        "Batch Size": 12
+      }
+    }],
+    "funnels": [{
+      "identifier": "00000000-0000-0000-0000-000000000010",
+      "name": "CoolFunnel"
+    }],
+    "connections": [{
+      "identifier": "00000000-0000-0000-0000-000000000002",
+      "name": "GenToFunnel",
+      "source": {
+        "id": "00000000-0000-0000-0000-000000000001",
+        "name": "MyGenFF"
+      },
+      "destination": {
+        "id": "00000000-0000-0000-0000-000000000010",
+        "name": "CoolFunnel"
+      },
+      "selectedRelationships": ["a", "b"],
+      "backPressureObjectThreshold": 7,
+      "backPressureDataSizeThreshold": "11 KB",
+      "flowFileExpiration": "13 sec"
+    }, {
+     "identifier": "00000000-0000-0000-0000-000000000008",
+      "name": "FunnelToS2S",
+      "source": {
+        "id": "00000000-0000-0000-0000-000000000010",
+        "name": "CoolFunnel"
+      },
+      "destination": {
+        "id": "00000000-0000-0000-0000-000000000003",
+        "name": "AmazingInputPort"
+      },
+      "selectedRelationships": ["success"]
+    }],
+    "remoteProcessGroups": [{
+      "name": "NiFi Flow",
+      "targetUri": "https://localhost:8090/nifi",
+      "yieldDuration": "6 sec",
+      "communicationsTimeout": "19 sec",
+      "inputPorts": [{
+        "identifier": "00000000-0000-0000-0000-000000000003",
+        "name": "AmazingInputPort",
+        "targetId": "00000000-0000-0000-0000-000000000005",
+        "concurrentlySchedulableTaskCount": 7
+      }]
+    }]
+  }
+})";
+
+  std::unique_ptr<core::ProcessGroup> flow = config.getRootFromPayload(CONFIG_JSON);
+  REQUIRE(flow);
+
+  // verify processor
+  auto* proc = flow->findProcessorByName("MyGenFF");
+  REQUIRE(proc);
+  REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001");
+  REQUIRE(15 == proc->getMaxConcurrentTasks());
+  REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == proc->getSchedulingStrategy());
+  REQUIRE(3s == proc->getSchedulingPeriodNano());
+  REQUIRE(12s == proc->getPenalizationPeriod());
+  REQUIRE(4s == proc->getYieldPeriodMsec());
+  REQUIRE(proc->isAutoTerminated({"one", ""}));
+  REQUIRE(proc->isAutoTerminated({"two", ""}));
+  REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
+  REQUIRE(proc->getProperty("File Size") == "10 B");
+  REQUIRE(proc->getProperty("Batch Size") == "12");
+
+  // verify funnel
+  auto* funnel = dynamic_cast<minifi::Funnel*>(flow->findProcessorByName("CoolFunnel"));
+  REQUIRE(funnel);
+  REQUIRE(funnel->getUUIDStr() == "00000000-0000-0000-0000-000000000010");
+
+  // verify RPG input port
+  auto* port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(flow->findProcessorByName("AmazingInputPort"));
+  REQUIRE(port);
+  REQUIRE(port->getUUIDStr() == "00000000-0000-0000-0000-000000000003");
+  REQUIRE(port->getMaxConcurrentTasks() == 7);
+  REQUIRE(port->getInstances().size() == 1);
+  REQUIRE(port->getInstances().front().host_ == "localhost");
+  REQUIRE(port->getInstances().front().port_ == 8090);
+  REQUIRE(port->getInstances().front().protocol_ == "https://");
+  REQUIRE(port->getProperty("Port UUID") == "00000000-0000-0000-0000-000000000005");
+
+  // verify connection
+  std::map<std::string, minifi::Connection*> connection_map;
+  flow->getConnections(connection_map);
+  REQUIRE(4 == connection_map.size());
+  auto connection1 = connection_map.at("00000000-0000-0000-0000-000000000002");
+  REQUIRE(connection1);
+  REQUIRE("GenToFunnel" == connection1->getName());
+  REQUIRE(connection1->getSource() == proc);
+  REQUIRE(connection1->getDestination() == funnel);
+  REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}}));
+  REQUIRE(connection1->getMaxQueueSize() == 7);
+  REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB);
+  REQUIRE(13s == connection1->getFlowExpirationDuration());
+
+  auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008");
+  REQUIRE(connection2);
+  REQUIRE("FunnelToS2S" == connection2->getName());
+  REQUIRE(connection2->getSource() == funnel);
+  REQUIRE(connection2->getDestination() == port);
+  REQUIRE(connection2->getRelationships() == (std::set<core::Relationship>{{"success", ""}}));
+}
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index b14ee1f83..651480578 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -28,17 +28,14 @@
 #include "Catch.h"
 #include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
 
 using namespace std::literals::chrono_literals;
 
 TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   SECTION("loading YAML without optional component IDs works") {
     static const std::string CONFIG_YAML_WITHOUT_IDS =
@@ -222,13 +219,9 @@ Provenance Reporting:
 }
 
 TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -347,13 +340,9 @@ NiFi Properties Overrides: {}
 }
 
 TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -496,17 +485,9 @@ NiFi Properties Overrides: {}
 }
 
 TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setTrace<core::YamlConfiguration>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -531,17 +512,9 @@ Processors:
 }
 
 TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -567,25 +540,16 @@ Processors:
   } catch (const std::exception &e) {
     caught_exception = true;
     REQUIRE("Unable to parse configuration file for component named 'XYZ' because required property "
-        "'Input Directory' is not set [in 'Processors' section of configuration file]" == std::string(e.what()));
+        "'Input Directory' is not set [in '/Processors/0/Properties' section of configuration file]" == std::string(e.what()));
   }
 
   REQUIRE(caught_exception);
 }
 
 TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  logTestController.setDebug<core::Processor>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -623,17 +587,9 @@ class DummyComponent : public core::ConfigurableComponent {
 };
 
 TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -644,17 +600,9 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
 }
 
 TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
@@ -673,16 +621,9 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
 }
 
 TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -693,15 +634,9 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
 }
 
 TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
-  TestController test_controller;
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -712,16 +647,9 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
 }
 
 TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -740,15 +668,8 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]")
 }
 
 TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
-  TestController test_controller;
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -767,13 +688,9 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
 }
 
 TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string CONFIG_YAML_WITH_FUNNEL =
     R"(
@@ -858,12 +775,8 @@ Remote Process Groups: []
 }
 
 TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
-  TestController test_controller;
-  std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yaml_config({test_flow_file_repo, content_repo, stream_factory, configuration});
+  ConfigurationTestController test_controller;
+  core::YamlConfiguration yaml_config(test_controller.getContext());
 
   for (char i = '1'; i <= '8'; ++i) {
     DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 354e944e2..dc51d0148 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -60,7 +60,7 @@ if (NOT OPENSSL_OFF)
     set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
 
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 6cf1778ce..3e8cee690 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -160,6 +160,10 @@ class RemoteProcessorGroupPort : public core::Processor {
     }
   }
 
+  std::vector<RPG> getInstances() const {
+    return nifi_instances_;
+  }
+
   void setHTTPProxy(const utils::HTTPProxy &proxy) {
     this->proxy_ = proxy;
   }
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index af43f8e02..89b1aa2cd 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -60,8 +60,6 @@ enum ProcessGroupType {
   REMOTE_PROCESS_GROUP,
 };
 
-#define ONSCHEDULE_RETRY_INTERVAL 30000  // millisecs
-
 class ProcessGroup : public CoreComponent {
   friend struct ::ProcessGroupTestAccessor;
  public:
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index fee9bd7e9..d454a3f69 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -30,11 +30,11 @@ namespace minifi {
 namespace core {
 
 
-#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
-#define DEFAULT_SCHEDULING_PERIOD_STR "1 sec"
+constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
+constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
 constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000};
 constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0};
-#define DEFAULT_MAX_CONCURRENT_TASKS 1
+constexpr int DEFAULT_MAX_CONCURRENT_TASKS{1};
 constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1};
 constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
 
diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/flow/AdaptiveConfiguration.h
similarity index 55%
rename from libminifi/include/core/json/JsonConfiguration.h
rename to libminifi/include/core/flow/AdaptiveConfiguration.h
index 35623381d..67a4c5885 100644
--- a/libminifi/include/core/json/JsonConfiguration.h
+++ b/libminifi/include/core/flow/AdaptiveConfiguration.h
@@ -15,37 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
-#include <memory>
-#include <optional>
 #include <string>
-#include <unordered_set>
+#include <memory>
 
-#include "core/FlowConfiguration.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "core/ProcessorConfig.h"
-#include "Exception.h"
-#include "io/StreamFactory.h"
-#include "io/validation.h"
-#include "sitetosite/SiteToSite.h"
-#include "utils/Id.h"
-#include "utils/StringUtils.h"
-#include "utils/file/FileSystem.h"
-#include "core/flow/StructuredConfiguration.h"
+#include "StructuredConfiguration.h"
 
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi::core::flow {
 
-class JsonConfiguration : public flow::StructuredConfiguration {
+class AdaptiveConfiguration : public StructuredConfiguration {
  public:
-  explicit JsonConfiguration(ConfigurationContext ctx);
-
-  ~JsonConfiguration() override = default;
-
-  std::unique_ptr<core::ProcessGroup> getRoot() override;
+  explicit AdaptiveConfiguration(ConfigurationContext ctx);
 
-  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override;
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &payload) override;
 };
 
-}  // namespace org::apache::nifi::minifi::core
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h
index 6c35b0cbc..11dc2b1e1 100644
--- a/libminifi/include/core/flow/CheckRequiredField.h
+++ b/libminifi/include/core/flow/CheckRequiredField.h
@@ -27,7 +27,7 @@
 namespace org::apache::nifi::minifi::core::flow {
 
 bool isFieldPresent(const Node &node, std::string_view field_name);
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = "");
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names);
 
 /**
  * This is a helper function for verifying the existence of a required
@@ -47,8 +47,8 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
  * @throws std::invalid_argument if the required field 'field_name' is
  *                               not present in 'node'
  */
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = "");
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_name, std::string_view error_message = "");
 
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {});
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message = {});
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h
new file mode 100644
index 000000000..0badac303
--- /dev/null
+++ b/libminifi/include/core/flow/FlowSchema.h
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+#include <string>
+
+namespace org::apache::nifi::minifi::core::flow {
+
+struct FlowSchema {
+  using Keys = std::vector<std::string>;
+
+  Keys flow_header;
+  Keys root_group;
+
+  Keys processors;
+  Keys processor_properties;
+  Keys autoterminated_rels;
+  Keys max_concurrent_tasks;
+  Keys penalization_period;
+  Keys proc_yield_period;
+  Keys runduration_nanos;
+  Keys onschedule_retry_interval;
+
+  Keys connections;
+  Keys max_queue_size;
+  Keys max_queue_data_size;
+  Keys swap_threshold;
+  Keys source_id;
+  Keys source_name;
+  Keys destination_id;
+  Keys destination_name;
+  Keys flowfile_expiration;
+  Keys drop_empty;
+  Keys source_relationship;
+  Keys source_relationship_list;
+
+  Keys process_groups;
+  Keys process_group_version;
+  Keys scheduling_strategy;
+  Keys scheduling_period;
+  Keys name;
+  Keys identifier;
+  Keys type;
+  Keys controller_services;
+  Keys controller_service_properties;
+  Keys remote_process_group;
+  Keys provenance_reporting;
+  Keys provenance_reporting_port_uuid;
+  Keys provenance_reporting_batch_size;
+  Keys funnels;
+  Keys input_ports;
+  Keys output_ports;
+  Keys rpg_url;
+  Keys rpg_yield_period;
+  Keys rpg_timeout;
+  Keys rpg_local_network_interface;
+  Keys rpg_transport_protocol;
+  Keys rpg_proxy_host;
+  Keys rpg_proxy_user;
+  Keys rpg_proxy_password;
+  Keys rpg_proxy_port;
+  Keys rpg_input_ports;
+  Keys rpg_output_ports;
+  Keys rpg_port_properties;
+  Keys rpg_port_target_id;
+
+  static FlowSchema getDefault();
+  static FlowSchema getNiFiFlowJson();
+};
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h
index 78735f95d..1bd3ad7e2 100644
--- a/libminifi/include/core/flow/Node.h
+++ b/libminifi/include/core/flow/Node.h
@@ -25,6 +25,8 @@
 #include <memory>
 #include <utility>
 #include "nonstd/expected.hpp"
+#include "utils/StringUtils.h"
+#include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
@@ -37,6 +39,7 @@ class Node {
   };
 
   class Iterator {
+    friend class Node;
    public:
     class Value;
 
@@ -53,6 +56,7 @@ class Node {
 
     Iterator& operator++() {
       impl_->operator++();
+      ++idx_;
       return *this;
     }
 
@@ -75,6 +79,8 @@ class Node {
 
    private:
     std::unique_ptr<IteratorImpl> impl_;
+    std::string path_;
+    int idx_{0};
   };
 
   class NodeImpl {
@@ -88,6 +94,7 @@ class Node {
     virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0;
     virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0;
     virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0;
+    virtual nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const = 0;
 
     virtual std::string getDebugString() const = 0;
 
@@ -98,6 +105,8 @@ class Node {
 
     virtual std::optional<Cursor> getCursor() const = 0;
 
+    virtual Node createEmpty() const = 0;
+
     virtual ~NodeImpl() = default;
   };
 
@@ -113,6 +122,7 @@ class Node {
   nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();}
   nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();}
   nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();}
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const {return impl_->getScalarAsString();}
 
   // return a string representation of the node (need not to be deserializable)
   std::string getDebugString() const {return impl_->getDebugString();}
@@ -121,14 +131,57 @@ class Node {
   size_t empty() const {
     return size() == 0;
   }
-  Iterator begin() const {return impl_->begin();}
-  Iterator end() const {return impl_->end();}
-  Node operator[](std::string_view key) const {return impl_->operator[](key);}
+  Iterator begin() const {
+    Iterator it = impl_->begin();
+    it.path_ = path_;
+    return it;
+  }
+  Iterator end() const {
+    Iterator it = impl_->end();
+    it.path_ = path_;
+    return it;
+  }
+
+  // considers @key to be a member of this node as is
+  Node getMember(std::string_view key) {
+    Node result = impl_->operator[](key);
+    result.path_ = utils::StringUtils::join_pack(path_, "/", key);
+    return result;
+  }
+
+  // considers @key to be a '/'-delimited access path
+  Node operator[](std::string_view key) const {
+    Node result = *this;
+    for (auto& field : utils::StringUtils::split(std::string{key}, "/")) {
+      if (key == ".") {
+        // pass: self
+      } else {
+        result = result.getMember(field);
+      }
+      if (!result) {
+        break;
+      }
+    }
+    return result;
+  }
+
+  // considers @keys to be a set of viable access paths, the first viable is returned
+  Node operator[](gsl::span<const std::string> keys) const {
+    for (auto& key : keys) {
+      if (Node result = (*this)[key]) {
+        return result;
+      }
+    }
+    return impl_->createEmpty();
+  }
+
+  std::string getPath() const {return path_;}
 
   std::optional<Cursor> getCursor() const {return impl_->getCursor();}
 
  private:
   std::shared_ptr<NodeImpl> impl_;
+  std::string path_;
 };
 
 class Node::Iterator::Value : public Node, public std::pair<Node, Node> {
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h
index 1b46cda9d..762a2b352 100644
--- a/libminifi/include/core/flow/StructuredConfiguration.h
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <unordered_set>
+#include <vector>
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -33,19 +34,10 @@
 #include "utils/StringUtils.h"
 #include "utils/file/FileSystem.h"
 #include "core/flow/Node.h"
+#include "FlowSchema.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
-static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller";
-static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors";
-static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
-static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting";
-static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels";
-static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports";
-static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports";
-
 class StructuredConfiguration : public FlowConfiguration {
  public:
   StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger);
@@ -60,6 +52,8 @@ class StructuredConfiguration : public FlowConfiguration {
    */
   void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &section) const;
 
+  std::unique_ptr<core::ProcessGroup> getRoot() override;
+
  protected:
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
@@ -70,7 +64,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @return             the root ProcessGroup node of the flow
    *                       configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node);
+  std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node, FlowSchema schema);
 
   std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false);
 
@@ -99,7 +93,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @param parent    the parent ProcessGroup for the port
    * @param direction the TransferDirection of the port
    */
-  void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
+  void parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
 
   /**
    * Parses the root level node for the flow configuration and
@@ -155,7 +149,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @param properties_node the Node containing the properties
    * @param processor      the Processor to which to add the resulting properties
    */
-  void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section);
+  void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name);
 
   /**
    * Parses the Funnels section of a configuration.
@@ -195,9 +189,9 @@ class StructuredConfiguration : public FlowConfiguration {
    *                   is optional and defaults to 'id'
    * @return         the parsed or generated UUID string
    */
-  std::string getOrGenerateId(const Node& node, const std::string& id_field = "id");
+  std::string getOrGenerateId(const Node& node);
 
-  std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = "");
+  std::string getRequiredIdField(const Node& node, std::string_view error_message = "");
 
   /**
    * This is a helper function for getting an optional value, if it exists.
@@ -213,7 +207,9 @@ class StructuredConfiguration : public FlowConfiguration {
    *                       the optional field is missing. If not provided,
    *                       a default info message will be generated.
    */
-  std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = "");
+  std::string getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message = "");
+
+  FlowSchema schema_;
 
   static std::shared_ptr<utils::IdGenerator> id_generator_;
   std::unordered_set<std::string> uuids_;
diff --git a/libminifi/include/core/flow/StructuredConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h
index 2b1d73743..6d7869ea6 100644
--- a/libminifi/include/core/flow/StructuredConnectionParser.h
+++ b/libminifi/include/core/flow/StructuredConnectionParser.h
@@ -26,18 +26,19 @@
 
 #include "core/flow/Node.h"
 #include "utils/gsl.h"
+#include "core/flow/FlowSchema.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
 class StructuredConnectionParser {
  public:
-  static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" };
-
-  explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+  explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent,
+                                      const std::shared_ptr<logging::Logger>& logger, std::optional<FlowSchema> schema = std::nullopt) :
       connectionNode_(connectionNode),
       name_(name),
       parent_(parent),
-      logger_(logger) {
+      logger_(logger),
+      schema_(schema.value_or(FlowSchema::getDefault())) {
     if (!connectionNode.isMap()) {
       throw std::logic_error("Connection node is not a map");
     }
@@ -60,6 +61,7 @@ class StructuredConnectionParser {
   const std::string& name_;
   gsl::not_null<core::ProcessGroup*> parent_;
   const std::shared_ptr<logging::Logger> logger_;
+  const FlowSchema schema_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h
index d19ca1582..bb11ca251 100644
--- a/libminifi/include/core/json/JsonNode.h
+++ b/libminifi/include/core/json/JsonNode.h
@@ -46,6 +46,10 @@ class JsonNode : public flow::Node::NodeImpl {
     return node_ ? node_->IsNull() : false;
   }
 
+  flow::Node createEmpty() const override {
+    return flow::Node{std::make_shared<JsonNode>(nullptr)};
+  }
+
   nonstd::expected<std::string, std::exception_ptr> getString() const override {
     try {
       if (!node_) {
@@ -93,12 +97,27 @@ class JsonNode : public flow::Node::NodeImpl {
       if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
       if (node_->IsInt64()) return std::to_string(node_->GetInt64());
       if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+      if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
       throw std::runtime_error("Cannot get string from non-integer json value");
     } catch (...) {
       return nonstd::make_unexpected(std::current_exception());
     }
   }
 
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+    try {
+      if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
+      if (node_->IsBool()) return node_->GetBool() ? "true" : "false";
+      if (node_->IsInt64()) return std::to_string(node_->GetInt64());
+      if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+      if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
+      if (node_->IsDouble()) return std::to_string(node_->GetDouble());
+      throw std::runtime_error("Cannot convert non-scalar json value to string");
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
   std::string getDebugString() const override {
     if (!node_) return "<invalid>";
     if (node_->IsObject()) return "<Map>";
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index 2262cd6e7..a98bb3bd6 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -49,7 +49,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode defaultSchedulingStrategy;
     defaultSchedulingStrategy.name = "defaultSchedulingStrategy";
-    defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY;
+    defaultSchedulingStrategy.value = core::DEFAULT_SCHEDULING_STRATEGY;
 
     schedulingDefaults.children.push_back(defaultSchedulingStrategy);
 
@@ -67,7 +67,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode defaultMaxConcurrentTasks;
     defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks";
-    defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS;
+    defaultMaxConcurrentTasks.value = core::DEFAULT_MAX_CONCURRENT_TASKS;
 
     schedulingDefaults.children.push_back(defaultMaxConcurrentTasks);
 
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 2b4a9d7d4..4d82ea394 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -44,15 +44,6 @@ class YamlConfiguration : public flow::StructuredConfiguration {
 
   ~YamlConfiguration() override = default;
 
-  /**
-   * Returns a shared pointer to a ProcessGroup object containing the
-   * flow configuration.
-   *
-   * @return               the root ProcessGroup node of the flow
-   *                        configuration tree
-   */
-  std::unique_ptr<core::ProcessGroup> getRoot() override;
-
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
    * flow configuration. The yamlConfigStream argument must point to
diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h
index ad8422f6e..193c61d55 100644
--- a/libminifi/include/core/yaml/YamlNode.h
+++ b/libminifi/include/core/yaml/YamlNode.h
@@ -49,6 +49,10 @@ class YamlNode : public flow::Node::NodeImpl {
     return node_.IsNull();
   }
 
+  flow::Node createEmpty() const override {
+    return flow::Node{std::make_shared<YamlNode>(YAML::Node{YAML::NodeType::Undefined})};
+  }
+
   nonstd::expected<std::string, std::exception_ptr> getString() const override {
     try {
       return node_.as<std::string>();
@@ -81,6 +85,14 @@ class YamlNode : public flow::Node::NodeImpl {
     }
   }
 
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+    try {
+      return node_.as<std::string>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
   std::string getDebugString() const override {
     if (!node_) return "<invalid>";
     if (node_.IsNull()) return "null";
@@ -149,11 +161,11 @@ class YamlIterator : public flow::Node::Iterator::IteratorImpl {
   YAML::const_iterator it_;
 };
 
-flow::Node::Iterator YamlNode::begin() const {
+inline flow::Node::Iterator YamlNode::begin() const {
   return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())};
 }
 
-flow::Node::Iterator YamlNode::end() const {
+inline flow::Node::Iterator YamlNode::end() const {
   return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())};
 }
 
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 81e2bb1e0..277184996 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -29,7 +29,7 @@
 #include "io/StreamFactory.h"
 
 #include "core/yaml/YamlConfiguration.h"
-#include "core/json/JsonConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
 
 namespace org::apache::nifi::minifi::core {
 
@@ -38,13 +38,7 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
   if (configuration_class_name) {
     class_name_lc = configuration_class_name.value();
   } else if (ctx.path) {
-    if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) {
-      class_name_lc = "yamlconfiguration";
-    } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) {
-      class_name_lc = "jsonconfiguration";
-    } else {
-      throw std::runtime_error("Could not infer config type from file path");
-    }
+    class_name_lc = "adaptiveconfiguration";
   } else {
     throw std::runtime_error("Neither configuration class nor config file path has been specified");
   }
@@ -57,8 +51,8 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
       return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx));
-    } else if (class_name_lc == "jsonconfiguration") {
-      return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx));
+    } else if (class_name_lc == "adaptiveconfiguration") {
+      return std::unique_ptr<core::flow::AdaptiveConfiguration>(instantiate<core::flow::AdaptiveConfiguration>(ctx));
     } else {
       if (fail_safe) {
         return std::make_unique<core::FlowConfiguration>(ctx);
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7300d2129..509c0542d 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -33,6 +33,8 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
+constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000;
+
 std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
 
 ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid)
@@ -54,7 +56,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils:
   if (parent_process_group_ != nullptr) {
     onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
   } else {
-    onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+    onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
   }
   transmitting_ = false;
   transport_protocol_ = "RAW";
@@ -69,7 +71,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
       parent_process_group_(nullptr),
       logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
   yield_period_msec_ = 0ms;
-  onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+  onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
   transmitting_ = false;
   transport_protocol_ = "RAW";
 
diff --git a/libminifi/src/core/flow/AdaptiveConfiguration.cpp b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
new file mode 100644
index 000000000..cef220bb4
--- /dev/null
+++ b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/AdaptiveConfiguration.h"
+
+#include "rapidjson/document.h"
+#include "core/json/JsonNode.h"
+#include "core/yaml/YamlNode.h"
+#include "yaml-cpp/yaml.h"
+#include "utils/file/FileUtils.h"
+#include "Defaults.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+AdaptiveConfiguration::AdaptiveConfiguration(ConfigurationContext ctx)
+    : StructuredConfiguration(([&] {
+        if (!ctx.path) {
+          if (utils::file::exists(DEFAULT_NIFI_CONFIG_JSON)) {
+            ctx.path = DEFAULT_NIFI_CONFIG_JSON;
+          } else {
+            ctx.path = DEFAULT_NIFI_CONFIG_YML;
+          }
+        }
+        return std::move(ctx);
+      })(),
+      logging::LoggerFactory<AdaptiveConfiguration>::getLogger()) {}
+
+std::unique_ptr<core::ProcessGroup> AdaptiveConfiguration::getRootFromPayload(const std::string &payload) {
+  try {
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(payload.c_str(), payload.length());
+    if (res) {
+      flow::Node root{std::make_shared<JsonNode>(&doc)};
+      if (root[FlowSchema::getDefault().flow_header]) {
+        logger_->log_debug("Processing configuration as default json");
+        return getRootFrom(root, FlowSchema::getDefault());
+      } else {
+        logger_->log_debug("Processing configuration as nifi flow json");
+        return getRootFrom(root, FlowSchema::getNiFiFlowJson());
+      }
+    }
+    logger_->log_debug("Could not parse configuration as json, trying yaml");
+    YAML::Node rootYamlNode = YAML::Load(payload);
+    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+    return getRootFrom(root, FlowSchema::getDefault());
+  } catch(...) {
+    logger_->log_error("Invalid configuration file");
+    throw;
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp
index 6942a5844..4539d9d78 100644
--- a/libminifi/src/core/flow/CheckRequiredField.cpp
+++ b/libminifi/src/core/flow/CheckRequiredField.cpp
@@ -27,7 +27,7 @@ bool isFieldPresent(const Node &node, std::string_view field_name) {
   return bool{node[field_name]};
 }
 
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) {
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names) {
   const Node name_node = node["name"];
   // Build a helpful error message for the user so they can fix the
   // invalid config file, using the component name if present
@@ -36,32 +36,32 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
       name_node ?
           "Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" :
           "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
-  if (!section.empty()) {
-    err_msg += " [in '" + std::string(section) + "' section of configuration file]";
-  }
+
+  err_msg += " [in '" + node.getPath() + "' section of configuration file]";
+
   if (auto cursor = node.getCursor()) {
     err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]";
   }
   return err_msg;
 }
 
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) {
-  if (!isFieldPresent(node, field_name)) {
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_names, std::string_view error_message) {
+  if (std::none_of(field_names.begin(), field_names.end(), [&] (auto& field) {return isFieldPresent(node, field);})) {
     if (error_message.empty()) {
-      throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section));
+      throw std::invalid_argument(buildErrorMessage(node, field_names));
     }
     throw std::invalid_argument(error_message.data());
   }
 }
 
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) {
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message) {
   for (const auto& name : alternate_names) {
     if (isFieldPresent(node, name)) {
       return node[name].getString().value();
     }
   }
   if (error_message.empty()) {
-    throw std::invalid_argument(buildErrorMessage(node, alternate_names, section));
+    throw std::invalid_argument(buildErrorMessage(node, alternate_names));
   }
   throw std::invalid_argument(error_message.data());
 }
diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp
new file mode 100644
index 000000000..a5df0dc64
--- /dev/null
+++ b/libminifi/src/core/flow/FlowSchema.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/FlowSchema.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+FlowSchema FlowSchema::getDefault() {
+  return FlowSchema{
+      .flow_header = {"Flow Controller"},
+      .root_group = {"."},
+
+      .processors = {"Processors"},
+      .processor_properties = {"Properties"},
+      .autoterminated_rels = {"auto-terminated relationships list"},
+      .max_concurrent_tasks = {"max concurrent tasks"},
+      .penalization_period = {"penalization period"},
+      .proc_yield_period = {"yield period"},
+      .runduration_nanos = {"run duration nanos"},
+      .onschedule_retry_interval = {"onschedule retry interval"},
+
+      .connections = {"Connections"},
+      .max_queue_size = {"max work queue size"},
+      .max_queue_data_size = {"max work queue data size"},
+      .swap_threshold = {"swap threshold"},
+      .source_id = {"source id"},
+      .source_name = {"source name"},
+      .destination_id = {"destination id"},
+      .destination_name = {"destination name"},
+      .flowfile_expiration = {"flowfile expiration"},
+      .drop_empty = {"drop empty"},
+      .source_relationship = {"source relationship name"},
+      .source_relationship_list = {"source relationship names"},
+
+      .process_groups = {"Process Groups"},
+      .process_group_version = {"version"},
+      .scheduling_strategy = {"scheduling strategy"},
+      .scheduling_period = {"scheduling period"},
+      .name = {"name"},
+      .identifier = {"id"},
+      .type = {"class", "type"},
+      .controller_services = {"Controller Services"},
+      .controller_service_properties = {"Properties"},
+      .remote_process_group = {"Remote Processing Groups", "Remote Process Groups"},
+      .provenance_reporting = {"Provenance Reporting"},
+      .provenance_reporting_port_uuid = {"port uuid"},
+      .provenance_reporting_batch_size = {"batch size"},
+      .funnels = {"Funnels"},
+      .input_ports = {"Input Ports"},
+      .output_ports = {"Output Ports"},
+
+      .rpg_url = {"url"},
+      .rpg_yield_period = {"yield period"},
+      .rpg_timeout = {"timeout"},
+      .rpg_local_network_interface = {"local network interface"},
+      .rpg_transport_protocol = {"transport protocol"},
+      .rpg_proxy_host = {"proxy host"},
+      .rpg_proxy_user = {"proxy user"},
+      .rpg_proxy_password = {"proxy password"},
+      .rpg_proxy_port = {"proxy port"},
+      .rpg_input_ports = {"Input Ports"},
+      .rpg_output_ports = {"Output Ports"},
+      .rpg_port_properties = {"Properties"},
+      .rpg_port_target_id = {}
+  };
+}
+
+FlowSchema FlowSchema::getNiFiFlowJson() {
+  return FlowSchema{
+      .flow_header = {"rootGroup"},
+      .root_group = {"rootGroup"},
+      .processors = {"processors"},
+      .processor_properties = {"properties"},
+      .autoterminated_rels = {"autoTerminatedRelationships"},
+      .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"},
+      .penalization_period = {"penaltyDuration"},
+      .proc_yield_period = {"yieldDuration"},
+      // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue
+      .runduration_nanos = {"runDurationMillis"},
+      .onschedule_retry_interval = {},
+
+      .connections = {"connections"},
+      .max_queue_size = {"backPressureObjectThreshold"},
+      .max_queue_data_size = {"backPressureDataSizeThreshold"},
+      .swap_threshold = {},
+      .source_id = {"source/id"},
+      .source_name = {"source/name"},
+      .destination_id = {"destination/id"},
+      .destination_name = {"destination/name"},
+      .flowfile_expiration = {"flowFileExpiration"},
+      // contrary to nifi we support dropEmpty in flow json as well
+      .drop_empty = {"dropEmpty"},
+      .source_relationship = {},
+      .source_relationship_list = {"selectedRelationships"},
+
+      .process_groups = {"processGroups"},
+      .process_group_version = {},
+      .scheduling_strategy = {"schedulingStrategy"},
+      .scheduling_period = {"schedulingPeriod"},
+      .name = {"name"},
+      .identifier = {"identifier"},
+      .type = {"type"},
+      .controller_services = {"controllerServices"},
+      .controller_service_properties = {"properties"},
+      .remote_process_group = {"remoteProcessGroups"},
+      .provenance_reporting = {},
+      .provenance_reporting_port_uuid = {},
+      .provenance_reporting_batch_size = {},
+      .funnels = {"funnels"},
+      .input_ports = {"inputPorts"},
+      .output_ports = {"outputPorts"},
+
+      .rpg_url = {"targetUri"},
+      .rpg_yield_period = {"yieldDuration"},
+      .rpg_timeout = {"communicationsTimeout"},
+      .rpg_local_network_interface = {"localNetworkInterface"},
+      .rpg_transport_protocol = {"transportProtocol"},
+      .rpg_proxy_host = {"proxyHost"},
+      .rpg_proxy_user = {"proxyUser"},
+      .rpg_proxy_password = {"proxyPassword"},
+      .rpg_proxy_port = {"proxyPort"},
+      .rpg_input_ports = {"inputPorts"},
+      .rpg_output_ports = {"outputPorts"},
+      .rpg_port_properties = {},
+      .rpg_port_target_id = {"targetId"}
+  };
+}
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/Node.cpp b/libminifi/src/core/flow/Node.cpp
index 7fd7b0df8..1c751ebe0 100644
--- a/libminifi/src/core/flow/Node.cpp
+++ b/libminifi/src/core/flow/Node.cpp
@@ -21,7 +21,15 @@
 namespace org::apache::nifi::minifi::core::flow {
 
 Node::Iterator::Value Node::Iterator::operator*() const {
-  return impl_->operator*();
+  Value value = impl_->operator*();
+  if (value) {
+    // sequence iterator
+    value.path_ = utils::StringUtils::join_pack(path_, "/", std::to_string(idx_));
+  } else if (value.second) {
+    // map iterator
+    value.second.path_ = utils::StringUtils::join_pack(path_, "/", value.first.getString().value());
+  }
+  return value;
 }
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 7e1d41e63..02624a4dc 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -34,13 +34,26 @@ namespace org::apache::nifi::minifi::core::flow {
 
 std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
 
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRoot() {
+  if (!config_path_) {
+    logger_->log_error("Cannot instantiate flow, no config file is set.");
+    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
+  }
+  const auto configuration = filesystem_->read(config_path_.value());
+  if (!configuration) {
+    // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+    return nullptr;
+  }
+  return getRootFromPayload(configuration.value());
+}
+
 StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger)
     : FlowConfiguration(std::move(ctx)),
       logger_(std::move(logger)) {}
 
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) {
-  auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY];
-  auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true);
+  checkRequiredField(root_flow_node, schema_.flow_header);
+  auto root_group = parseProcessGroup(root_flow_node[schema_.flow_header], root_flow_node[schema_.root_group], true);
   this->name_ = root_group->getName();
   return root_group;
 }
@@ -48,15 +61,15 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGro
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) {
   int version = 0;
 
-  checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-  auto flowName = node["name"].getString().value();
+  checkRequiredField(node, schema_.name);
+  auto flowName = node[schema_.name].getString().value();
 
   utils::Identifier uuid;
   // assignment throws on invalid uuid
   uuid = getOrGenerateId(node);
 
-  if (node["version"]) {
-    version = gsl::narrow<int>(node["version"].getInt64().value());
+  if (node[schema_.process_group_version]) {
+    version = gsl::narrow<int>(node[schema_.process_group_version].getInt64().value());
   }
 
   logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
@@ -67,8 +80,8 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
     group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
   }
 
-  if (node["onschedule retry interval"]) {
-    auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value();
+  if (node[schema_.onschedule_retry_interval]) {
+    auto onScheduleRetryPeriod = node[schema_.onschedule_retry_interval].getString().value();
     logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
 
     auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
@@ -83,20 +96,13 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
 
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) {
   auto group = createProcessGroup(header_node, is_root);
-  Node processorsNode = node[CONFIG_PROCESSORS_KEY];
-  Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY];
-  Node funnelsNode = node[CONFIG_FUNNELS_KEY];
-  Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY];
-  Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY];
-  Node remoteProcessingGroupsNode = [&] {
-    // assignment is not supported on invalid nodes
-    Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY];
-    if (candidate) {
-      return candidate;
-    }
-    return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3];
-  }();
-  Node childProcessGroupNodeSeq = node["Process Groups"];
+  Node processorsNode = node[schema_.processors];
+  Node connectionsNode = node[schema_.connections];
+  Node funnelsNode = node[schema_.funnels];
+  Node inputPortsNode = node[schema_.input_ports];
+  Node outputPortsNode = node[schema_.output_ports];
+  Node remoteProcessingGroupsNode = node[schema_.remote_process_group];
+  Node childProcessGroupNodeSeq = node[schema_.process_groups];
 
   parseProcessorNode(processorsNode, group.get());
   parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get());
@@ -115,10 +121,11 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(c
   return group;
 }
 
-std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node, FlowSchema schema) {
+  schema_ = std::move(schema);
   uuids_.clear();
-  Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY];
-  Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY];
+  Node controllerServiceNode = root_node[schema_.root_group][schema_.controller_services];
+  Node provenanceReportNode = root_node[schema_.provenance_reporting];
 
   parseControllerServices(controllerServiceNode);
   // Create the root process group
@@ -157,14 +164,14 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
   for (const auto& procNode : processors_node) {
     core::ProcessorConfig procCfg;
 
-    checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY);
-    procCfg.name = procNode["name"].getString().value();
+    checkRequiredField(procNode, schema_.name);
+    procCfg.name = procNode[schema_.name].getString().value();
     procCfg.id = getOrGenerateId(procNode);
 
     uuid = procCfg.id;
     logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
-    checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY);
-    procCfg.javaClass = procNode["class"].getString().value();
+    checkRequiredField(procNode, schema_.type);
+    procCfg.javaClass = procNode[schema_.type].getString().value();
     logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
 
     // Determine the processor name only from the Java class
@@ -187,36 +194,35 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
 
     processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
 
-    procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY);
+    procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
     logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
-    procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY);
+    procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);
 
     logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
-    if (auto tasksNode = procNode["max concurrent tasks"]) {
+    if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
       procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
     }
 
-    if (procNode["penalization period"]) {
-      procCfg.penalizationPeriod = procNode["penalization period"].getString().value();
+    if (auto penalizationNode = procNode[schema_.penalization_period]) {
+      procCfg.penalizationPeriod = penalizationNode.getString().value();
       logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
     }
 
-    if (procNode["yield period"]) {
-      procCfg.yieldPeriod = procNode["yield period"].getString().value();
+    if (auto yieldNode = procNode[schema_.proc_yield_period]) {
+      procCfg.yieldPeriod = yieldNode.getString().value();
       logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
     }
 
-    if (auto runNode = procNode["run duration nanos"]) {
+    if (auto runNode = procNode[schema_.runduration_nanos]) {
       procCfg.runDurationNanos = runNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
     }
 
     // handle auto-terminated relationships
-    if (procNode["auto-terminated relationships list"]) {
-      Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+    if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
       std::vector<std::string> rawAutoTerminatedRelationshipValues;
       if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
         for (const auto& autoTerminatedRel : autoTerminatedSequence) {
@@ -227,9 +233,8 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
     }
 
     // handle processor properties
-    if (procNode["Properties"]) {
-      Node propertiesNode = procNode["Properties"];
-      parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY);
+    if (Node propertiesNode = procNode[schema_.processor_properties]) {
+      parsePropertiesNode(propertiesNode, *processor, procCfg.name);
     }
 
     // Take care of scheduling
@@ -304,13 +309,13 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     return;
   }
   for (const auto& currRpgNode : rpg_node_seq) {
-    checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-    auto name = currRpgNode["name"].getString().value();
+    checkRequiredField(currRpgNode, schema_.name);
+    auto name = currRpgNode[schema_.name].getString().value();
     id = getOrGenerateId(currRpgNode);
 
     logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
 
-    auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+    auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");
 
     logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
 
@@ -318,8 +323,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     auto group = createRemoteProcessGroup(name, uuid);
     group->setParent(parentGroup);
 
-    if (currRpgNode["yield period"]) {
-      auto yieldPeriod = currRpgNode["yield period"].getString().value();
+    if (currRpgNode[schema_.rpg_yield_period]) {
+      auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
 
       auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
@@ -329,8 +334,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
       }
     }
 
-    if (currRpgNode["timeout"]) {
-      auto timeout = currRpgNode["timeout"].getString().value();
+    if (currRpgNode[schema_.rpg_timeout]) {
+      auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
 
       auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
@@ -340,33 +345,33 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
       }
     }
 
-    if (currRpgNode["local network interface"]) {
-      auto interface = currRpgNode["local network interface"].getString().value();
+    if (currRpgNode[schema_.rpg_local_network_interface]) {
+      auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
       group->setInterface(interface);
     }
 
-    if (currRpgNode["transport protocol"]) {
-      auto transport_protocol = currRpgNode["transport protocol"].getString().value();
+    if (currRpgNode[schema_.rpg_transport_protocol]) {
+      auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
       if (transport_protocol == "HTTP") {
         group->setTransportProtocol(transport_protocol);
-        if (currRpgNode["proxy host"]) {
-          auto http_proxy_host = currRpgNode["proxy host"].getString().value();
+        if (currRpgNode[schema_.rpg_proxy_host]) {
+          auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
           logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
           group->setHttpProxyHost(http_proxy_host);
-          if (currRpgNode["proxy user"]) {
-            auto http_proxy_username = currRpgNode["proxy user"].getString().value();
+          if (currRpgNode[schema_.rpg_proxy_user]) {
+            auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
             logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
             group->setHttpProxyUserName(http_proxy_username);
           }
-          if (currRpgNode["proxy password"]) {
-            auto http_proxy_password = currRpgNode["proxy password"].getString().value();
+          if (currRpgNode[schema_.rpg_proxy_password]) {
+            auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
             logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
             group->setHttpProxyPassWord(http_proxy_password);
           }
-          if (currRpgNode["proxy port"]) {
-            auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value();
+          if (currRpgNode[schema_.rpg_proxy_port]) {
+            auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
             int32_t port;
             if (core::Property::StringToInt(http_proxy_port, port)) {
               logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
@@ -386,19 +391,19 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     group->setTransmitting(true);
     group->setURL(url);
 
-    checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-    auto inputPorts = currRpgNode["Input Ports"];
+    checkRequiredField(currRpgNode, schema_.rpg_input_ports);
+    auto inputPorts = currRpgNode[schema_.rpg_input_ports];
     if (inputPorts && inputPorts.isSequence()) {
       for (const auto& currPort : inputPorts) {
-        parsePort(currPort, group.get(), sitetosite::SEND);
+        parseRPGPort(currPort, group.get(), sitetosite::SEND);
       }  // for node
     }
-    auto outputPorts = currRpgNode["Output Ports"];
+    auto outputPorts = currRpgNode[schema_.rpg_output_ports];
     if (outputPorts && outputPorts.isSequence()) {
       for (const auto& currPort : outputPorts) {
         logger_->log_debug("Got a current port, iterating...");
 
-        parsePort(currPort, group.get(), sitetosite::RECEIVE);
+        parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
       }  // for node
     }
     parentGroup->addProcessGroup(std::move(group));
@@ -420,10 +425,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
 
   auto reportTask = createProvenanceReportTask();
 
-  checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY);
-  auto schedulingStrategyStr = node["scheduling strategy"].getString().value();
-  checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY);
-  auto schedulingPeriodStr = node["scheduling period"].getString().value();
+  checkRequiredField(node, schema_.scheduling_strategy);
+  auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value();
+  checkRequiredField(node, schema_.scheduling_period);
+  auto schedulingPeriodStr = node[schema_.scheduling_period].getString().value();
 
   if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
     logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
@@ -456,10 +461,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
       logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
     }
   }
-  checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY);
-  auto portUUIDStr = node["port uuid"].getString().value();
-  checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY);
-  auto batchSizeStr = node["batch size"].getString().value();
+  checkRequiredField(node, schema_.provenance_reporting_port_uuid);
+  auto portUUIDStr = node[schema_.provenance_reporting_port_uuid].getString().value();
+  checkRequiredField(node, schema_.provenance_reporting_batch_size);
+  auto batchSizeStr = node[schema_.provenance_reporting_batch_size].getString().value();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   port_uuid = portUUIDStr;
@@ -481,9 +486,9 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
     return;
   }
   for (const auto& service_node : controller_services_node) {
-    checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY);
+    checkRequiredField(service_node, schema_.name);
 
-    auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY);
+    auto type = getRequiredField(service_node, schema_.type);
     logger_->log_debug("Using type %s for controller service node", type);
 
     std::string fullType = type;
@@ -493,8 +498,8 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
       type = type.substr(lastOfIdx);
     }
 
-    auto name = service_node["name"].getString().value();
-    auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY);
+    auto name = service_node[schema_.name].getString().value();
+    auto id = getRequiredIdField(service_node);
 
     utils::Identifier uuid;
     uuid = id;
@@ -502,11 +507,11 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
     if (nullptr != controller_service_node) {
       logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
       controller_service_node->initialize();
-      if (Node propertiesNode = service_node["Properties"]) {
+      if (Node propertiesNode = service_node[schema_.controller_service_properties]) {
         // we should propagate properties to the node and to the implementation
-        parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY);
+        parsePropertiesNode(propertiesNode, *controller_service_node, name);
         if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
-          parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY);
+          parsePropertiesNode(propertiesNode, *controllerServiceImpl, name);
         }
       }
     } else {
@@ -538,7 +543,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
 
     // Default name to be same as ID
     // If name is specified in configuration, use the value
-    const auto name = connection_node["name"].getString().value_or(id);
+    const auto name = connection_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect connection UUID format.");
@@ -547,7 +552,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
 
     auto connection = createConnection(name, uuid.value());
     logger_->log_debug("Created connection with UUID %s and name %s", id, name);
-    const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+    const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_);
     connectionParser.configureConnectionSourceRelationships(*connection);
     connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
     connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
@@ -561,7 +566,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
   }
 }
 
-void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
+void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
   utils::Identifier uuid;
 
   if (!parent) {
@@ -570,9 +575,9 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   }
 
   // Check for required fields
-  checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-  auto nameStr = port_node["name"].getString().value();
-  auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY,
+  checkRequiredField(port_node, schema_.name);
+  auto nameStr = port_node[schema_.name].getString().value();
+  auto portId = getRequiredIdField(port_node,
     "The field 'id' is required for "
     "the port named '" + nameStr + "' in the Flow Config. If this port "
     "is an input port for a NiFi Remote Process Group, the port "
@@ -597,8 +602,11 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   // else defaults to RAW
 
   // handle port properties
-  if (Node propertiesNode = port_node["Properties"]) {
-    parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY);
+  if (Node propertiesNode = port_node[schema_.rpg_port_properties]) {
+    parsePropertiesNode(propertiesNode, *port, nameStr);
+  } else {
+    parsePropertyNodeElement(minifi::RemoteProcessorGroupPort::portUUID.getName(), port_node[schema_.rpg_port_target_id], *port);
+    validateComponentProperties(*port, nameStr, port_node.getPath());
   }
 
   // add processor to parent
@@ -606,7 +614,7 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   parent->addProcessor(std::move(port));
   processor.setScheduledState(core::RUNNING);
 
-  if (auto tasksNode = port_node["max concurrent tasks"]) {
+  if (auto tasksNode = port_node[schema_.max_concurrent_tasks]) {
     std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value();
     int32_t maxConcurrentTasks;
     if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
@@ -658,7 +666,7 @@ PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTy
     } else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) {
       coercedValue = property_value_node.getBool().value();
     } else {
-      coercedValue = property_value_node.getString().value();
+      coercedValue = property_value_node.getScalarAsString().value();
     }
     return coercedValue;
   } catch (const std::exception& e) {
@@ -687,7 +695,7 @@ void StructuredConfiguration::parseSingleProperty(const std::string& property_na
     throw;
   }
   if (!property_set) {
-    const auto rawValueString = property_value_node.getString().value();
+    const auto rawValueString = property_value_node.getScalarAsString().value();
     auto proc = dynamic_cast<core::Connectable*>(&processor);
     if (proc) {
       logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
@@ -714,7 +722,7 @@ void StructuredConfiguration::parsePropertyNodeElement(const std::string& proper
   }
 }
 
-void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) {
+void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name) {
   // Treat generically as a node so we can perform inspection on entries to ensure they are populated
   logger_->log_trace("Entered %s", component_name);
   for (const auto& property_node : properties_node) {
@@ -723,7 +731,7 @@ void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, c
     parsePropertyNodeElement(propertyName, propertyValueNode, component);
   }
 
-  validateComponentProperties(component, component_name, section);
+  validateComponentProperties(component, component_name, properties_node.getPath());
 }
 
 void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) {
@@ -739,7 +747,7 @@ void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup*
     std::string id = getOrGenerateId(funnel_node);
 
     // Default name to be same as ID
-    const auto name = funnel_node["name"].getString().value_or(id);
+    const auto name = funnel_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect funnel UUID format.");
@@ -767,7 +775,7 @@ void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGr
     std::string id = getOrGenerateId(port_node);
 
     // Default name to be same as ID
-    const auto name = port_node["name"].getString().value_or(id);
+    const auto name = port_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect port UUID format.");
@@ -861,14 +869,14 @@ void StructuredConfiguration::raiseComponentError(const std::string &component_n
   throw std::invalid_argument(err_msg);
 }
 
-std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) {
-  if (node[id_field]) {
-    if (auto opt_id_str = node[id_field].getString()) {
+std::string StructuredConfiguration::getOrGenerateId(const Node& node) {
+  if (node[schema_.identifier]) {
+    if (auto opt_id_str = node[schema_.identifier].getString()) {
       auto id = opt_id_str.value();
       addNewId(id);
       return id;
     }
-    throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string.");
+    throw std::invalid_argument("getOrGenerateId: idField '" + utils::StringUtils::join(",", schema_.identifier) + "' is expected to contain string.");
   }
 
   auto id = id_generator_->generate().to_string();
@@ -876,27 +884,24 @@ std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std
   return id;
 }
 
-std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) {
-  checkRequiredField(node, "id", section, error_message);
-  auto id = node["id"].getString().value();
+std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view error_message) {
+  checkRequiredField(node, schema_.identifier, error_message);
+  auto id = node[schema_.identifier].getString().value();
   addNewId(id);
   return id;
 }
 
-std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section,
-                                               const std::string& info_message) {
+std::string StructuredConfiguration::getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message) {
   std::string infoMessage = info_message;
   auto result = node[field_name];
   if (!result) {
     if (infoMessage.empty()) {
       // Build a helpful info message for the user to inform them that a default is being used
-      infoMessage =
-          node["name"] ?
-              "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" :
-              "Using default value for optional field '" + field_name + "' ";
-      if (!section.empty()) {
-        infoMessage += " [in '" + section + "' section of configuration file]: ";
+      infoMessage = "Using default value for optional field '" + utils::StringUtils::join(",", field_name) + "'";
+      if (auto name = node["name"]) {
+        infoMessage += "' in component named '" + name.getString().value() + "'";
       }
+      infoMessage += " [in '" + node.getPath() + "' section of configuration file]: ";
 
       infoMessage += default_value;
     }
diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
index f39a3ac8b..a6521884b 100644
--- a/libminifi/src/core/flow/StructuredConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -49,10 +49,9 @@ void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Conne
 
 void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const {
   // Configure connection source
-  if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) {
-    addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection);
-  } else if (connectionNode_["source relationship names"]) {
-    auto relList = connectionNode_["source relationship names"];
+  if (connectionNode_[schema_.source_relationship] && !connectionNode_[schema_.source_relationship].getString().value().empty()) {
+    addNewRelationshipToConnection(connectionNode_[schema_.source_relationship].getString().value(), connection);
+  } else if (auto relList = connectionNode_[schema_.source_relationship_list]) {
     if (relList.isSequence() && !relList.empty()) {
       for (const auto &rel : relList) {
         addNewRelationshipToConnection(rel.getString().value(), connection);
@@ -68,7 +67,7 @@ void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::
 }
 
 uint64_t StructuredConnectionParser::getWorkQueueSize() const {
-  if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) {
+  if (auto max_work_queue_data_size_node = connectionNode_[schema_.max_queue_size]) {
     std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_size;
     if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
@@ -81,7 +80,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const {
 }
 
 uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
-  const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  const flow::Node max_work_queue_data_size_node = connectionNode_[schema_.max_queue_data_size];
   if (max_work_queue_data_size_node) {
     std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_data_size = 0;
@@ -95,7 +94,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
 }
 
 uint64_t StructuredConnectionParser::getSwapThreshold() const {
-  const flow::Node swap_threshold_node = connectionNode_["swap threshold"];
+  const flow::Node swap_threshold_node = connectionNode_[schema_.swap_threshold];
   if (swap_threshold_node) {
     auto swap_threshold_str = swap_threshold_node.getString().value();
     uint64_t swap_threshold;
@@ -109,7 +108,7 @@ uint64_t StructuredConnectionParser::getSwapThreshold() const {
 }
 
 utils::Identifier StructuredConnectionParser::getSourceUUID() const {
-  const flow::Node source_id_node = connectionNode_["source id"];
+  const flow::Node source_id_node = connectionNode_[schema_.source_id];
   if (source_id_node) {
     const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value());
     if (srcUUID) {
@@ -120,8 +119,8 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
     throw std::invalid_argument("Invalid source id");
   }
   // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
-  checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY);
-  const auto connectionSrcProcName = connectionNode_["source name"].getString().value();
+  checkRequiredField(connectionNode_, schema_.source_name);
+  const auto connectionSrcProcName = connectionNode_[schema_.source_name].getString().value();
   const auto srcUUID = utils::Identifier::parse(connectionSrcProcName);
   if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the source name is a remote port id, so use that as the source id
@@ -141,7 +140,7 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
 }
 
 utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
-  const flow::Node destination_id_node = connectionNode_["destination id"];
+  const flow::Node destination_id_node = connectionNode_[schema_.destination_id];
   if (destination_id_node) {
     const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value());
     if (destUUID) {
@@ -153,8 +152,8 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
   }
   // we use the same logic as above for resolving the source processor
   // for looking up the destination processor in absence of a processor id
-  checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY);
-  auto connectionDestProcName = connectionNode_["destination name"].getString().value();
+  checkRequiredField(connectionNode_, schema_.destination_name);
+  auto connectionDestProcName = connectionNode_[schema_.destination_name].getString().value();
   const auto destUUID = utils::Identifier::parse(connectionDestProcName);
   if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the destination name is a remote port id, so use that as the dest id
@@ -175,7 +174,7 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
 
 std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const {
   using namespace std::literals::chrono_literals;
-  const flow::Node expiration_node = connectionNode_["flowfile expiration"];
+  const flow::Node expiration_node = connectionNode_[schema_.flowfile_expiration];
   if (!expiration_node) {
     logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
     return 0ms;
@@ -196,7 +195,7 @@ std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() co
 }
 
 bool StructuredConnectionParser::getDropEmpty() const {
-  const flow::Node drop_empty_node = connectionNode_["drop empty"];
+  const flow::Node drop_empty_node = connectionNode_[schema_.drop_empty];
   if (drop_empty_node) {
     return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false);
   }
diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp
deleted file mode 100644
index 960f9a631..000000000
--- a/libminifi/src/core/json/JsonConfiguration.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <memory>
-#include <vector>
-#include <set>
-#include <cinttypes>
-#include <variant>
-
-#include "core/json/JsonConfiguration.h"
-#include "core/json/JsonNode.h"
-#include "core/state/Value.h"
-#include "Defaults.h"
-#include "utils/TimeUtil.h"
-
-#include "rapidjson/rapidjson.h"
-#include "rapidjson/document.h"
-
-namespace org::apache::nifi::minifi::core {
-
-namespace {
-
-}  // namespace
-
-
-JsonConfiguration::JsonConfiguration(ConfigurationContext ctx)
-    : StructuredConfiguration(([&] {
-                                if (!ctx.path) {
-                                  ctx.path = DEFAULT_NIFI_CONFIG_JSON;
-                                }
-                                return std::move(ctx);
-                              })(),
-                              logging::LoggerFactory<JsonConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() {
-  if (!config_path_) {
-    logger_->log_error("Cannot instantiate flow, no config file is set.");
-    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
-  }
-  const auto configuration = filesystem_->read(config_path_.value());
-  if (!configuration) {
-    // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
-    return nullptr;
-  }
-  try {
-    rapidjson::Document doc;
-    rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length());
-    if (!res) {
-      throw std::runtime_error("Could not parse json file");
-    }
-    flow::Node root{std::make_shared<JsonNode>(&doc)};
-    return getRootFrom(root);
-  } catch(...) {
-    logger_->log_error("Invalid json configuration file");
-    throw;
-  }
-}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) {
-  try {
-    rapidjson::Document doc;
-    rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length());
-    if (!res) {
-      throw std::runtime_error("Could not parse json file");
-    }
-    flow::Node root{std::make_shared<JsonNode>(&doc)};
-    return getRootFrom(root);
-  } catch (const std::runtime_error& err) {
-    logger_->log_error(err.what());
-    throw;
-  }
-}
-
-}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 89092eee5..f7bde6a66 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -40,31 +40,11 @@ YamlConfiguration::YamlConfiguration(ConfigurationContext ctx)
         })(),
         logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() {
-  if (!config_path_) {
-    logger_->log_error("Cannot instantiate flow, no config file is set.");
-    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
-  }
-  const auto configuration = filesystem_->read(config_path_.value());
-  if (!configuration) {
-  // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
-  return nullptr;
-  }
-  try {
-    YAML::Node rootYamlNode = YAML::Load(configuration.value());
-    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
-  } catch(...) {
-    logger_->log_error("Invalid yaml configuration file");
-    throw;
-  }
-}
-
 std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) {
   try {
     YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
     flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
+    return getRootFrom(root, flow::FlowSchema::getDefault());
   } catch (const YAML::ParserException &pe) {
     logger_->log_error(pe.what());
     throw;
@@ -75,7 +55,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const
   try {
     YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
     flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
+    return getRootFrom(root, flow::FlowSchema::getDefault());
   } catch (const YAML::ParserException &pe) {
     logger_->log_error(pe.what());
     throw;
diff --git a/libminifi/test/ConfigurationTestController.h b/libminifi/test/ConfigurationTestController.h
new file mode 100644
index 000000000..05e33214d
--- /dev/null
+++ b/libminifi/test/ConfigurationTestController.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "TestBase.h"
+#include "core/FlowConfiguration.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+class ConfigurationTestController : public TestController {
+ public:
+  ConfigurationTestController() {
+    flow_file_repo_ = core::createRepository("flowfilerepository");
+    configuration_ = std::make_shared<minifi::Configure>();
+    stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+    content_repo_ = std::make_shared<core::repository::VolatileContentRepository>();
+
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setTrace<core::YamlConfiguration>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setTrace<core::flow::AdaptiveConfiguration>();
+  }
+
+  core::ConfigurationContext getContext() const {
+    return core::ConfigurationContext{
+        .flow_file_repo = flow_file_repo_,
+        .content_repo = content_repo_,
+        .stream_factory = stream_factory_,
+        .configuration = configuration_
+    };
+  }
+
+  std::shared_ptr<core::Repository> flow_file_repo_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+  std::shared_ptr<core::ContentRepository> content_repo_;
+};
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 63ee396b2..a3ed6a483 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -232,7 +232,7 @@ int main(int argc, char **argv) {
     std::string graceful_shutdown_seconds;
     std::string prov_repo_class = "provenancerepository";
     std::string flow_repo_class = "flowfilerepository";
-    std::string nifi_configuration_class_name = "yamlconfiguration";
+    std::string nifi_configuration_class_name = "adaptiveconfiguration";
     std::string content_repo_class = "filesystemrepository";
 
     auto log_properties = std::make_shared<core::logging::LoggerProperties>();