You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/12/19 11:41:14 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1846 - Json configuration parsing

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ced398819 MINIFICPP-1846 - Json configuration parsing
ced398819 is described below

commit ced398819e053c642279e620efb55d79b736fef5
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Mon Dec 19 12:17:50 2022 +0100

    MINIFICPP-1846 - Json configuration parsing
    
    Closes #1391
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 controller/Controller.h                            |   5 +-
 extensions/coap/tests/CoapIntegrationBase.h        |   4 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   4 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |   4 +-
 extensions/http-curl/tests/VerifyInvokeHTTP.h      |   2 +-
 .../tests/unit/YamlConfigurationTests.cpp          |  31 +-
 .../tests/unit/YamlConnectionParserTest.cpp        | 145 ++--
 .../tests/unit/YamlProcessGroupParserTests.cpp     |   2 +-
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/Defaults.h                       |   1 +
 libminifi/include/core/ConfigurationFactory.h      |  16 +-
 libminifi/include/core/FlowConfiguration.h         |  16 +-
 .../core/{yaml => flow}/CheckRequiredField.h       |  39 +-
 libminifi/include/core/flow/Node.h                 | 139 +++
 libminifi/include/core/flow/README.md              |  57 ++
 .../include/core/flow/StructuredConfiguration.h    | 240 ++++++
 .../StructuredConnectionParser.h}                  |  36 +-
 libminifi/include/core/json/JsonConfiguration.h    |  51 ++
 libminifi/include/core/json/JsonNode.h             | 233 +++++
 libminifi/include/core/yaml/YamlConfiguration.h    | 261 +-----
 libminifi/include/core/yaml/YamlNode.h             | 160 ++++
 libminifi/include/utils/ValueCaster.h              |  14 +-
 libminifi/src/core/ConfigurationFactory.cpp        |  48 +-
 libminifi/src/core/FlowConfiguration.cpp           |  24 +-
 libminifi/src/core/flow/CheckRequiredField.cpp     |  69 ++
 .../{include/Defaults.h => src/core/flow/Node.cpp} |  15 +-
 .../StructuredConfiguration.cpp}                   | 596 ++++++-------
 .../StructuredConnectionParser.cpp}                |  85 +-
 libminifi/src/core/json/JsonConfiguration.cpp      |  89 ++
 libminifi/src/core/yaml/CheckRequiredField.cpp     |  80 --
 libminifi/src/core/yaml/YamlConfiguration.cpp      | 948 +--------------------
 libminifi/test/flow-tests/TestControllerWithFlow.h |   2 +-
 libminifi/test/integration/IntegrationBase.h       |   2 +-
 .../test/integration/ProvenanceReportingTest.cpp   |   4 +-
 .../PersistableKeyValueStoreServiceTest.cpp        |   2 +-
 .../UnorderedMapKeyValueStoreServiceTest.cpp       |   3 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   4 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   2 +-
 minifi_main/MiNiFiMain.cpp                         |  10 +-
 39 files changed, 1620 insertions(+), 1825 deletions(-)

diff --git a/controller/Controller.h b/controller/Controller.h
index b30f79748..dd5c4119c 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -279,7 +279,8 @@ std::shared_ptr<org::apache::nifi::minifi::core::controller::ControllerService>
 
   const auto stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(configuration);
 
-  auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name);
+  auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(
+      org::apache::nifi::minifi::core::ConfigurationContext{prov_repo, flow_repo, content_repo, stream_factory, configuration}, nifi_configuration_class_name);
 
   const auto controller = std::make_unique<org::apache::nifi::minifi::FlowController>(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo);
   controller->load();
@@ -340,7 +341,7 @@ void printManifest(const std::shared_ptr<org::apache::nifi::minifi::Configure> &
   const auto stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(configuration);
 
   auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(
-      prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name);
+      org::apache::nifi::minifi::core::ConfigurationContext{prov_repo, flow_repo, content_repo, stream_factory, configuration}, nifi_configuration_class_name);
 
   const auto controller = std::make_unique<org::apache::nifi::minifi::FlowController>(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo, "manifest");
   controller->load();
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 43e97d17d..755e6fc7d 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -64,9 +64,9 @@ class CoapIntegrationBase : public IntegrationBase {
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
     std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-    auto yaml_ptr = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+    auto yaml_ptr = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
 
-    core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+    core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
 
     std::shared_ptr<core::ProcessGroup> pg{ yaml_config.getRoot() };
 
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index bc152618c..147bbdf05 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -134,13 +134,13 @@ int main(int argc, char **argv) {
   content_repo->initialize(configuration);
 
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
-    test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+      core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
 
   std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
       test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
       std::make_shared<utils::file::FileSystem>(), []{});
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+  core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
 
   auto root = yaml_config.getRoot();
   const auto proc = root->findProcessorByName("invoke");
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index ff6dd5eb3..59a408552 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -68,7 +68,7 @@ int main(int argc, char **argv) {
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
-      test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+      core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
 
   const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
       content_repo,
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
   disabled = false;
   std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+  core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
 
   auto pg = yaml_config.getRoot();
 
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index 860bfd93a..5b917ddb5 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -91,7 +91,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
     std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-    auto yaml_ptr = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path);
+    auto yaml_ptr = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path});
     flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
     flowController_->load();
 
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index f4fd27f29..e9c9fd4bf 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -39,7 +39,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   SECTION("loading YAML without optional component IDs works") {
     static const std::string CONFIG_YAML_WITHOUT_IDS =
@@ -230,7 +230,7 @@ TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -356,7 +356,7 @@ TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -510,7 +510,7 @@ TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -546,7 +546,7 @@ TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -591,7 +591,7 @@ TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -640,7 +640,7 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -662,7 +662,7 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
@@ -680,7 +680,6 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
   REQUIRE(config_failed);
 }
 
-#ifdef YAML_CONFIGURATION_USE_REGEX
 TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
   TestController test_controller;
 
@@ -692,7 +691,7 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -712,7 +711,7 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -733,7 +732,7 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]")
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -761,7 +760,7 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -779,8 +778,6 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
   REQUIRE(config_failed);
 }
 
-#endif  // YAML_CONFIGURATION_USE_REGEX
-
 TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
   TestController test_controller;
 
@@ -789,7 +786,7 @@ TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+  core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
 
   static const std::string CONFIG_YAML_WITH_FUNNEL =
     R"(
@@ -880,7 +877,7 @@ TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yaml_config(test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration);
+  core::YamlConfiguration yaml_config({test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration});
 
   for (char i = '1'; i <= '8'; ++i) {
     DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index 0276ddd90..44d346ed4 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
 
-#include "core/yaml/YamlConnectionParser.h"
+#include "core/flow/StructuredConnectionParser.h"
 
 #include "core/yaml/YamlConfiguration.h"
 #include "TailFile.h"
 #include "TestBase.h"
 #include "Catch.h"
 #include "utils/TestUtils.h"
+#include "core/yaml/YamlNode.h"
 
 using namespace std::literals::chrono_literals;
 
 namespace {
 
-using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::flow::StructuredConnectionParser;
 using org::apache::nifi::minifi::core::YamlConfiguration;
 using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+using org::apache::nifi::minifi::core::YamlNode;
+namespace flow = org::apache::nifi::minifi::core::flow;
 
 TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") {
   const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
@@ -53,25 +56,28 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
           "- something_else\n" };
       expectations = { { "success", "" }, { "failure", "" }, { "something_else", "" } };
     }
-    YAML::Node connection_node = YAML::Load(serialized_yaml);
-    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-    yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection);
+    YAML::Node yaml_node = YAML::Load(serialized_yaml);
+    flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+    StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    yaml_connection_parser.configureConnectionSourceRelationships(*connection);
     const std::set<core::Relationship>& relationships = connection->getRelationships();
     REQUIRE(expectations == relationships);
   }
   SECTION("Queue size limits are read") {
-    YAML::Node connection_node = YAML::Load(std::string {
+    YAML::Node yaml_node = YAML::Load(std::string {
         "max work queue size: 231\n"
         "max work queue data size: 12 MB\n" });
-    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-    REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
-    REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
+    flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+    StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getWorkQueueSize());
+    REQUIRE(12_MiB == yaml_connection_parser.getWorkQueueDataSize());
   }
   SECTION("Queue swap threshold is read") {
-    YAML::Node connection_node = YAML::Load(std::string {
+    YAML::Node yaml_node = YAML::Load(std::string {
         "swap threshold: 231\n" });
-    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-    REQUIRE(231 == yaml_connection_parser.getSwapThresholdFromYaml());
+    flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+    StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getSwapThreshold());
   }
   SECTION("Source and destination names and uuids are read") {
     const utils::Identifier expected_source_id = utils::generateUUID();
@@ -94,115 +100,120 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
           "source name: TailFile_1\n"
           "destination name: TailFile_2\n" };
     }
-    YAML::Node connection_node = YAML::Load(serialized_yaml);
-    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-    REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUIDFromYaml());
-    REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUIDFromYaml());
+    YAML::Node yaml_node = YAML::Load(serialized_yaml);
+    flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+    StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUID());
+    REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUID());
   }
   SECTION("Flow file expiration is read") {
-    YAML::Node connection_node = YAML::Load(std::string {
+    YAML::Node yaml_node = YAML::Load(std::string {
         "flowfile expiration: 2 min\n" });
-    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-    REQUIRE(2min == yaml_connection_parser.getFlowFileExpirationFromYaml());
+    flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+    StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(2min == yaml_connection_parser.getFlowFileExpiration());
   }
   SECTION("Drop empty value is read") {
     SECTION("When config contains true value") {
-      YAML::Node connection_node = YAML::Load(std::string {
+      YAML::Node yaml_node = YAML::Load(std::string {
           "drop empty: true\n" });
-      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-      REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+      flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+      StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(true == yaml_connection_parser.getDropEmpty());
     }
     SECTION("When config contains false value") {
-      YAML::Node connection_node = YAML::Load(std::string {
+      YAML::Node yaml_node = YAML::Load(std::string {
           "drop empty: false\n" });
-      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-      REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+      flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+      StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(false == yaml_connection_parser.getDropEmpty());
     }
   }
   SECTION("Errors are handled properly when configuration lines are missing") {
     const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
     SECTION("With empty configuration") {
-      YAML::Node connection_node = YAML::Load(std::string(""));
-      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-      CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
-      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getSwapThresholdFromYaml());
-      CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
-      CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+      YAML::Node yaml_node = YAML::Load(std::string(""));
+      flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+
+      CHECK_THROWS(StructuredConnectionParser(connection_node, "test_node", parent_ptr, logger));
     }
     SECTION("With a configuration that lists keys but has no assigned values") {
       std::string serialized_yaml;
       SECTION("Single relationship name left empty") {
-        YAML::Node connection_node = YAML::Load(std::string {
+        YAML::Node yaml_node = YAML::Load(std::string {
             "source name: \n"
             "destination name: \n" });
-        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+        StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
         // This seems incorrect, but we do not want to ruin backward compatibility
-        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
+        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
       }
       SECTION("List of relationship names contains empty item") {
-        YAML::Node connection_node = YAML::Load(std::string {
+        YAML::Node yaml_node = YAML::Load(std::string {
             "source relationship names:\n"
             "- \n" });
-        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
+        flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+        StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
       }
       SECTION("Source and destination lookup from via id") {
-        YAML::Node connection_node = YAML::Load(std::string {
+        YAML::Node yaml_node = YAML::Load(std::string {
             "source id: \n"
             "destination id: \n" });
-        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
-        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+        flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+        StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUID());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
       }
       SECTION("Source and destination lookup via name") {
-        YAML::Node connection_node = YAML::Load(std::string {
+        YAML::Node yaml_node = YAML::Load(std::string {
             "source name: \n"
             "destination name: \n" });
-        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
-        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+        flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+        StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUID());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
       }
       SECTION("Queue limits and configuration") {
-        YAML::Node connection_node = YAML::Load(std::string {
+        YAML::Node yaml_node = YAML::Load(std::string {
             "max work queue size: \n"
             "max work queue data size: \n"
             "swap threshold: \n"
             "flowfile expiration: \n"
             "drop empty: \n"});
-        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-        CHECK(0 == yaml_connection_parser.getWorkQueueSizeFromYaml());
-        CHECK(0 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
-        CHECK(0 == yaml_connection_parser.getSwapThresholdFromYaml());
-        CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
-        CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
+        flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+        StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK(0 == yaml_connection_parser.getWorkQueueSize());
+        CHECK(0 == yaml_connection_parser.getWorkQueueDataSize());
+        CHECK(0 == yaml_connection_parser.getSwapThreshold());
+        CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
+        CHECK(0 == yaml_connection_parser.getDropEmpty());
       }
     }
     SECTION("With a configuration that has values of incorrect format") {
-      YAML::Node connection_node = YAML::Load(std::string {
+      YAML::Node yaml_node = YAML::Load(std::string {
           "max work queue size: 2 KB\n"
           "max work queue data size: 10 Incorrect\n"
           "flowfile expiration: 12\n"
           "drop empty: sup\n"});
-      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+      StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
       // This seems incorrect, but we do not want to ruin backward compatibility
-      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
-      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSize());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSize());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpiration());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmpty());
     }
     SECTION("Known incorrect formats that behave strangely") {
-      YAML::Node connection_node = YAML::Load(std::string {
+      YAML::Node yaml_node = YAML::Load(std::string {
           "max work queue data size: 2 Baby Pandas (img, 20 MB) that are cared for by a group of 30 giraffes\n"
           "flowfile expiration: 0\n"
           "drop empty: NULL\n"});
-      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-      CHECK(2 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
-      CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
-      CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
+      flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+      StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK(2 == yaml_connection_parser.getWorkQueueDataSize());
+      CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
+      CHECK(0 == yaml_connection_parser.getDropEmpty());
     }
   }
 }
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
index 0f5879587..4ad98d747 100644
--- a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -22,7 +22,7 @@
 #include "IntegrationTestUtils.h"
 #include "ProcessGroupTestUtils.h"
 
-static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
+static core::YamlConfiguration config({nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>()});
 
 TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser]") {
   auto pattern = Group("root")
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 049e5ef41..f4bbee6f8 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
     set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
 
diff --git a/libminifi/include/Defaults.h b/libminifi/include/Defaults.h
index 0ee8eaa48..3e119e294 100644
--- a/libminifi/include/Defaults.h
+++ b/libminifi/include/Defaults.h
@@ -18,6 +18,7 @@
 #pragma once
 
 const std::filesystem::path DEFAULT_NIFI_CONFIG_YML = std::filesystem::path("conf") / "config.yml";
+const std::filesystem::path DEFAULT_NIFI_CONFIG_JSON = std::filesystem::path("conf") / "config.json";
 const std::filesystem::path DEFAULT_NIFI_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi.properties";
 const std::filesystem::path DEFAULT_LOG_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-log.properties";
 const std::filesystem::path DEFAULT_UID_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-uid.properties";
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
index 1d648f211..c3efee5e1 100644
--- a/libminifi/include/core/ConfigurationFactory.h
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -23,6 +23,7 @@
 #include <optional>
 #include <string>
 #include <type_traits>
+#include <utility>
 
 #include "FlowConfiguration.h"
 
@@ -33,24 +34,15 @@ namespace minifi {
 namespace core {
 
 template<typename T>
-T* instantiate(
-    const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
-    const std::shared_ptr<core::ContentRepository> &content_repo, const std::shared_ptr<io::StreamFactory> &stream_factory,
-    std::shared_ptr<Configure> configuration, const std::optional<std::string>& path,
-    const std::shared_ptr<utils::file::FileSystem>& filesystem) {
-  return new T(repo, flow_file_repo, content_repo, stream_factory, configuration, path, filesystem);
+T* instantiate(ConfigurationContext ctx) {
+  return new T(std::move(ctx));
 }
 
 /**
  * Configuration factory is used to create a new FlowConfiguration
  * object.
  */
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
-    std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
-    std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
-    std::shared_ptr<io::StreamFactory> stream_factory, const std::string& configuration_class_name,
-    const std::optional<std::string>& path = {}, std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(),
-    bool fail_safe = false);
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const ConfigurationContext& ctx, const std::optional<std::string>& configuration_class_name, bool fail_safe = false);
 
 }  // namespace core
 }  // namespace minifi
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 36f111d50..4039d7224 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <filesystem>
 
 #include "core/Core.h"
 #include "Connection.h"
@@ -51,6 +52,16 @@ class static_initializers {
 
 extern static_initializers &get_static_functions();
 
+struct ConfigurationContext {
+  std::shared_ptr<core::Repository> repo;
+  std::shared_ptr<core::Repository> flow_file_repo;
+  std::shared_ptr<core::ContentRepository> content_repo;
+  std::shared_ptr<io::StreamFactory> stream_factory;
+  std::shared_ptr<Configure> configuration;
+  std::optional<std::filesystem::path> path{std::nullopt};
+  std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
+};
+
 /**
  * Purpose: Flow configuration defines the mechanism
  * by which we will configure our flow controller
@@ -61,10 +72,7 @@ class FlowConfiguration : public CoreComponent {
    * Constructor that will be used for configuring
    * the flow controller.
    */
-  explicit FlowConfiguration(const std::shared_ptr<core::Repository>& repo, std::shared_ptr<core::Repository> flow_file_repo,
-                             std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
-                             std::shared_ptr<Configure> configuration, const std::optional<std::filesystem::path>& path,
-                             std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>());
+  explicit FlowConfiguration(ConfigurationContext ctx);
 
   ~FlowConfiguration() override;
 
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h
similarity index 54%
rename from libminifi/include/core/yaml/CheckRequiredField.h
rename to libminifi/include/core/flow/CheckRequiredField.h
index 2f0e4b0eb..6c35b0cbc 100644
--- a/libminifi/include/core/yaml/CheckRequiredField.h
+++ b/libminifi/include/core/flow/CheckRequiredField.h
@@ -21,45 +21,34 @@
 #include <memory>
 #include <vector>
 
-#include "core/logging/LoggerFactory.h"
-#include "yaml-cpp/yaml.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/flow/Node.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace yaml {
+namespace org::apache::nifi::minifi::core::flow {
 
-bool isFieldPresent(const YAML::Node &yaml_node, std::string_view field_name);
-std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_field_names, std::string_view yaml_section = "");
+bool isFieldPresent(const Node &node, std::string_view field_name);
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = "");
 
 /**
  * This is a helper function for verifying the existence of a required
- * field in a YAML::Node object. If the field is not present, an error
+ * field in a Node object. If the field is not present, an error
  * message will be logged and an std::invalid_argument exception will be
- * thrown indicating the absence of the required field in the YAML node.
+ * thrown indicating the absence of the required field in the node.
  *
- * @param yaml_node     the YAML node to check
+ * @param node          the node to check
  * @param field_name    the required field key
- * @param yaml_section  [optional] the top level section of the YAML config
- *                       for the yaml_node. This is used for generating a
+ * @param section       [optional] the top level section of the config
+ *                       for the node. This is used for generating a
  *                       useful error message for troubleshooting.
  * @param error_message [optional] the error message string to use if
  *                       the required field is missing. If not provided,
  *                       a default error message will be generated.
  *
  * @throws std::invalid_argument if the required field 'field_name' is
- *                               not present in 'yaml_node'
+ *                               not present in 'node'
  */
-void checkRequiredField(
-    const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section = "", std::string_view error_message = "");
+void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = "");
 
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message = {});
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {});
 
-}  // namespace yaml
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h
new file mode 100644
index 000000000..78735f95d
--- /dev/null
+++ b/libminifi/include/core/flow/Node.h
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string_view>
+#include <tuple>
+#include <optional>
+#include <string>
+#include <memory>
+#include <utility>
+#include "nonstd/expected.hpp"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+class Node {
+ public:
+  struct Cursor {
+    int line{0};
+    int column{0};
+    int pos{0};
+  };
+
+  class Iterator {
+   public:
+    class Value;
+
+    class IteratorImpl {
+     public:
+      virtual IteratorImpl& operator++() = 0;
+      virtual bool operator==(const IteratorImpl& other) const = 0;
+      virtual Value operator*() const = 0;
+      bool operator!=(const IteratorImpl& other) const {return !(*this == other);}
+
+      virtual std::unique_ptr<IteratorImpl> clone() const = 0;
+      virtual ~IteratorImpl() = default;
+    };
+
+    Iterator& operator++() {
+      impl_->operator++();
+      return *this;
+    }
+
+    explicit Iterator(std::unique_ptr<IteratorImpl> impl) : impl_(std::move(impl)) {}
+    Iterator(const Iterator& other): impl_(other.impl_->clone()) {}
+    Iterator(Iterator&&) = default;
+    Iterator& operator=(const Iterator& other) {
+      if (this == &other) {
+        return *this;
+      }
+      impl_ = other.impl_->clone();
+      return *this;
+    }
+    Iterator& operator=(Iterator&&) = default;
+
+    bool operator==(const Iterator& other) const {return impl_->operator==(*other.impl_);}
+    bool operator!=(const Iterator& other) const {return !(*this == other);}
+
+    Value operator*() const;
+
+   private:
+    std::unique_ptr<IteratorImpl> impl_;
+  };
+
+  class NodeImpl {
+   public:
+    virtual explicit operator bool() const = 0;
+    virtual bool isSequence() const = 0;
+    virtual bool isMap() const = 0;
+    virtual bool isNull() const = 0;
+
+    virtual nonstd::expected<std::string, std::exception_ptr> getString() const = 0;
+    virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0;
+    virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0;
+    virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0;
+
+    virtual std::string getDebugString() const = 0;
+
+    virtual size_t size() const = 0;
+    virtual Iterator begin() const = 0;
+    virtual Iterator end() const = 0;
+    virtual Node operator[](std::string_view key) const = 0;
+
+    virtual std::optional<Cursor> getCursor() const = 0;
+
+    virtual ~NodeImpl() = default;
+  };
+
+  Node() = default;
+  explicit Node(std::shared_ptr<NodeImpl> impl): impl_(std::move(impl)) {}
+
+  explicit operator bool() const {return impl_->operator bool();}
+  bool isSequence() const {return impl_->isSequence();}
+  bool isMap() const {return impl_->isMap();}
+  bool isNull() const {return impl_->isNull();}
+
+  nonstd::expected<std::string, std::exception_ptr> getString() const {return impl_->getString();}
+  nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();}
+  nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();}
+  nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();}
+
+  // return a string representation of the node (need not to be deserializable)
+  std::string getDebugString() const {return impl_->getDebugString();}
+
+  size_t size() const {return impl_->size();}
+  size_t empty() const {
+    return size() == 0;
+  }
+  Iterator begin() const {return impl_->begin();}
+  Iterator end() const {return impl_->end();}
+  Node operator[](std::string_view key) const {return impl_->operator[](key);}
+
+  std::optional<Cursor> getCursor() const {return impl_->getCursor();}
+
+ private:
+  std::shared_ptr<NodeImpl> impl_;
+};
+
+class Node::Iterator::Value : public Node, public std::pair<Node, Node> {
+ public:
+  Value(Node node, Node key, Node value): Node{std::move(node)}, std::pair<Node, Node>{std::move(key), std::move(value)} {}
+};
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/README.md b/libminifi/include/core/flow/README.md
new file mode 100644
index 000000000..c62c71495
--- /dev/null
+++ b/libminifi/include/core/flow/README.md
@@ -0,0 +1,57 @@
+## Differences between JSON and YAML implementation
+
+### YAML
+
+The possible types of a `YAML::Node` are:
+* Undefined
+* Null
+* Map
+* Sequence
+* Scalar
+
+#### Undefined
+
+The result of querying any member of `Null`, querying non-existing members of `Map`,
+or non-existing indices of `Sequence`.
+
+Note that for `Map`s string conversion applies `map[0]` could be valid, given a key `"0"`,
+while for `Sequence`s string index parsing does NOT happen `seq["0"]`
+will return `Undefined` even if the sequence is non-empty.
+
+Querying or otherwise accessing an `Undefined` (other than `operator bool` or `IsDefined`) usually throws.
+
+#### Null
+
+The value of parsing an empty document, the value of a `Map` item with empty value,
+the value of an omitted `Sequence` item.
+
+```
+key1:      # this is a Null
+key2: ''   # this is a Scalar, the empty string
+arr:
+  - one
+  -        # Null as well
+  - three
+```
+
+#### Scalar
+
+A string value, all conversions to numbers happen on the fly.
+
+### Conversions
+
+#### 1. `::as<std::string>`
+
+* `Null` --> `"null"`
+* `Scalar` --> the string value
+* others --> throws
+
+#### 2. `::as<YAML::Node>`
+
+It was used in multiple places, it seems to throw on `Undefined` and return itself for all
+other types.
+
+### JSON 
+
+In contrast to these JSON has real bools, numbers, strings, there is no string-to-int
+conversion happening.
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h
new file mode 100644
index 000000000..1b46cda9d
--- /dev/null
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_set>
+
+#include "core/FlowConfiguration.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessorConfig.h"
+#include "Exception.h"
+#include "io/StreamFactory.h"
+#include "io/validation.h"
+#include "sitetosite/SiteToSite.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileSystem.h"
+#include "core/flow/Node.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller";
+static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors";
+static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services";
+static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
+static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
+static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting";
+static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels";
+static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports";
+static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports";
+
+class StructuredConfiguration : public FlowConfiguration {
+ public:
+  StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger);
+
+  /**
+   * Iterates all component property validation rules and checks that configured state
+   * is valid. If state is determined to be invalid, conf parsing ends and an error is raised.
+   *
+   * @param component
+   * @param component_name
+   * @param section
+   */
+  void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &section) const;
+
+ protected:
+  /**
+   * Returns a shared pointer to a ProcessGroup object containing the
+   * flow configuration.
+   *
+   * @param root_node a pointer to a Node object containing the root
+   *                       node of the parsed document
+   * @return             the root ProcessGroup node of the flow
+   *                       configuration tree
+   */
+  std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node);
+
+  std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false);
+
+  std::unique_ptr<core::ProcessGroup> parseProcessGroup(const Node& header_node, const Node& node, bool is_root = false);
+  /**
+   * Parses processors from its corresponding config node and adds
+   * them to a parent ProcessGroup. The processors_node argument must point
+   * to a Node containing the processors configuration. Processor
+   * objects will be created and added to the parent ProcessGroup specified
+   * by the parent argument.
+   *
+   * @param processors_node  the Node containing the processor configuration
+   * @param parent              the parent ProcessGroup to which the the created
+   *                            Processor should be added
+   */
+  void parseProcessorNode(const Node& processors_node, core::ProcessGroup* parent);
+
+  /**
+   * Parses a port from its corresponding config node and adds
+   * it to a parent ProcessGroup. The port_node argument must point
+   * to a Node containing the port configuration. A RemoteProcessorGroupPort
+   * object will be created a added to the parent ProcessGroup specified
+   * by the parent argument.
+   *
+   * @param port_node  the Node containing the port configuration
+   * @param parent    the parent ProcessGroup for the port
+   * @param direction the TransferDirection of the port
+   */
+  void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
+
+  /**
+   * Parses the root level node for the flow configuration and
+   * returns a ProcessGroup containing the tree of flow configuration
+   * objects.
+   *
+   * @param root_flow_node
+   * @return
+   */
+  std::unique_ptr<core::ProcessGroup> parseRootProcessGroup(const Node& root_flow_node);
+
+  void parseProcessorProperty(const Node& doc, const Node& node, std::shared_ptr<core::Processor> processor);
+
+  void parseControllerServices(const Node& controller_services_node);
+
+  /**
+   * Parses the Connections section of a configuration.
+   * The resulting Connections are added to the parent ProcessGroup.
+   *
+   * @param connection_node_seq   the Node containing the Connections section
+   *                              of the configuration
+   * @param parent                the root node of flow configuration to which
+   *                              to add the connections that are parsed
+   */
+  void parseConnection(const Node& connection_node_seq, core::ProcessGroup* parent);
+
+  /**
+   * Parses the Remote Process Group section of a configuration.
+   * The resulting Process Group is added to the parent ProcessGroup.
+   *
+   * @param rpg_node_seq  the Node containing the Remote Process Group
+   *                      section of the configuration
+   * @param parent        the root node of flow configuration to which
+   *                      to add the process groups that are parsed
+   */
+  void parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parent);
+
+  /**
+   * Parses the Provenance Reporting section of a configuration.
+   * The resulting Provenance Reporting processor is added to the
+   * parent ProcessGroup.
+   *
+   * @param report_node  the Node containing the provenance
+   *                      reporting configuration
+   * @param parent_group the root node of flow configuration to which
+   *                      to add the provenance reporting config
+   */
+  void parseProvenanceReporting(const Node& report_node, core::ProcessGroup* parent_group);
+
+  /**
+   * A helper function to parse the Properties Node for a processor.
+   *
+   * @param properties_node the Node containing the properties
+   * @param processor      the Processor to which to add the resulting properties
+   */
+  void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section);
+
+  /**
+   * Parses the Funnels section of a configuration.
+   * The resulting Funnels are added to the parent ProcessGroup.
+   *
+   * @param node   the Node containing the Funnels section
+   *                 of the configuration
+   * @param parent the root node of flow configuration to which
+   *                 to add the funnels that are parsed
+   */
+  void parseFunnels(const Node& node, core::ProcessGroup* parent);
+
+  /**
+   * Parses the Input/Output Ports section of a configuration YAML.
+   * The resulting ports are added to the parent ProcessGroup.
+   *
+   * @param node   the YAML::Node containing the Input/Output Ports section
+   *                 of the configuration YAML
+   * @param parent the root node of flow configuration to which
+   *                 to add the funnels that are parsed
+   */
+  void parsePorts(const flow::Node& node, core::ProcessGroup* parent, PortType port_type);
+
+  /**
+   * A helper function for parsing or generating optional id fields.
+   *
+   * In parsing flow configurations for config schema v1, the
+   * 'id' field of most component types that contains a UUID is optional.
+   * This function will check for the existence of the specified
+   * idField in the specified node. If present, the field will be parsed
+   * as a UUID and the UUID string will be returned. If not present, a
+   * random UUID string will be generated and returned.
+   *
+   * @param node     a pointer to the Node that will be checked for the
+   *                   presence of an idField
+   * @param id_field  the string of the name of the idField to check for. This
+   *                   is optional and defaults to 'id'
+   * @return         the parsed or generated UUID string
+   */
+  std::string getOrGenerateId(const Node& node, const std::string& id_field = "id");
+
+  std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = "");
+
+  /**
+   * This is a helper function for getting an optional value, if it exists.
+   * If it does not exist, returns the provided default value.
+   *
+   * @param node         the flow node to check
+   * @param field_name    the optional field key
+   * @param default_value the default value to use if field is not set
+   * @param section  [optional] the top level section of the config
+   *                       for the node. This is used fpr generating a
+   *                       useful info message for troubleshooting.
+   * @param info_message  [optional] the info message string to use if
+   *                       the optional field is missing. If not provided,
+   *                       a default info message will be generated.
+   */
+  std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = "");
+
+  static std::shared_ptr<utils::IdGenerator> id_generator_;
+  std::unordered_set<std::string> uuids_;
+  std::shared_ptr<logging::Logger> logger_;
+
+ private:
+  PropertyValue getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& property_from_processor, const Node& property_value_node);
+  void parsePropertyValueSequence(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& component);
+  void parseSingleProperty(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor);
+  void parsePropertyNodeElement(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor);
+  void addNewId(const std::string& uuid);
+
+  /**
+   * Raises a human-readable configuration error for the given configuration component/section.
+   *
+   * @param component_name
+   * @param section
+   * @param reason
+   */
+  void raiseComponentError(const std::string &component_name, const std::string &section, const std::string &reason) const;
+};
+
+}  // namespace org::apache::nifi::minifi::core::flow
+
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h
similarity index 61%
rename from libminifi/include/core/yaml/YamlConnectionParser.h
rename to libminifi/include/core/flow/StructuredConnectionParser.h
index e1c544d77..2b1d73743 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/flow/StructuredConnectionParser.h
@@ -24,38 +24,42 @@
 #include "core/ProcessGroup.h"
 #include "core/logging/LoggerFactory.h"
 
-#include "yaml-cpp/yaml.h"
+#include "core/flow/Node.h"
 #include "utils/gsl.h"
 
-namespace org::apache::nifi::minifi::core::yaml {
+namespace org::apache::nifi::minifi::core::flow {
 
-class YamlConnectionParser {
+class StructuredConnectionParser {
  public:
-  static constexpr const char* CONFIG_YAML_CONNECTIONS_KEY{ "Connections" };
+  static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" };
 
-  explicit YamlConnectionParser(const YAML::Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+  explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
       connectionNode_(connectionNode),
       name_(name),
       parent_(parent),
-      logger_(logger) {}
+      logger_(logger) {
+    if (!connectionNode.isMap()) {
+      throw std::logic_error("Connection node is not a map");
+    }
+  }
 
-  void configureConnectionSourceRelationshipsFromYaml(minifi::Connection& connection) const;
-  [[nodiscard]] uint64_t getWorkQueueSizeFromYaml() const;
-  [[nodiscard]] uint64_t getWorkQueueDataSizeFromYaml() const;
-  [[nodiscard]] utils::Identifier getSourceUUIDFromYaml() const;
-  [[nodiscard]] uint64_t getSwapThresholdFromYaml() const;
-  [[nodiscard]] utils::Identifier getDestinationUUIDFromYaml() const;
-  [[nodiscard]] std::chrono::milliseconds getFlowFileExpirationFromYaml() const;
-  [[nodiscard]] bool getDropEmptyFromYaml() const;
+  void configureConnectionSourceRelationships(minifi::Connection& connection) const;
+  [[nodiscard]] uint64_t getWorkQueueSize() const;
+  [[nodiscard]] uint64_t getWorkQueueDataSize() const;
+  [[nodiscard]] uint64_t getSwapThreshold() const;
+  [[nodiscard]] utils::Identifier getSourceUUID() const;
+  [[nodiscard]] utils::Identifier getDestinationUUID() const;
+  [[nodiscard]] std::chrono::milliseconds getFlowFileExpiration() const;
+  [[nodiscard]] bool getDropEmpty() const;
 
  private:
   void addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const;
   void addFunnelRelationshipToConnection(minifi::Connection& connection) const;
 
-  const YAML::Node& connectionNode_;
+  const Node& connectionNode_;
   const std::string& name_;
   gsl::not_null<core::ProcessGroup*> parent_;
   const std::shared_ptr<logging::Logger> logger_;
 };
 
-}  // namespace org::apache::nifi::minifi::core::yaml
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/json/JsonConfiguration.h
new file mode 100644
index 000000000..35623381d
--- /dev/null
+++ b/libminifi/include/core/json/JsonConfiguration.h
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_set>
+
+#include "core/FlowConfiguration.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessorConfig.h"
+#include "Exception.h"
+#include "io/StreamFactory.h"
+#include "io/validation.h"
+#include "sitetosite/SiteToSite.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileSystem.h"
+#include "core/flow/StructuredConfiguration.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class JsonConfiguration : public flow::StructuredConfiguration {
+ public:
+  explicit JsonConfiguration(ConfigurationContext ctx);
+
+  ~JsonConfiguration() override = default;
+
+  std::unique_ptr<core::ProcessGroup> getRoot() override;
+
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override;
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h
new file mode 100644
index 000000000..d19ca1582
--- /dev/null
+++ b/libminifi/include/core/json/JsonNode.h
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "core/flow/Node.h"
+#include "rapidjson/document.h"
+#include "utils/gsl.h"
+#include "utils/ValueCaster.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class JsonNode : public flow::Node::NodeImpl {
+ public:
+  explicit JsonNode(const rapidjson::Value* node): node_(node) {}
+
+  explicit operator bool() const override {
+    return node_ != nullptr;
+  }
+  bool isSequence() const override {
+    return node_ ? node_->IsArray() : false;
+  }
+  bool isMap() const override {
+    return node_ ? node_->IsObject() : false;
+  }
+  bool isNull() const override {
+    return node_ ? node_->IsNull() : false;
+  }
+
+  nonstd::expected<std::string, std::exception_ptr> getString() const override {
+    try {
+      if (!node_) {
+        throw std::runtime_error("Cannot get string of invalid json value");
+      }
+      if (!node_->IsString()) {
+        throw std::runtime_error("Cannot get string of non-string json value");
+      }
+      return std::string{node_->GetString(), node_->GetStringLength()};
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<int64_t, std::exception_ptr> getInt64() const override {
+    try {
+      if (!node_) {
+        throw std::runtime_error("Cannot get int64 of invalid json value");
+      }
+      if (!node_->IsInt64()) {
+        throw std::runtime_error("Cannot get int64 of non-int64 json value");
+      }
+      return node_->GetInt64();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<bool, std::exception_ptr> getBool() const override {
+    try {
+      if (!node_) {
+        throw std::runtime_error("Cannot get bool of invalid json value");
+      }
+      if (!node_->IsBool()) {
+        throw std::runtime_error("Cannot get bool of non-bool json value");
+      }
+      return node_->GetBool();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const override {
+    try {
+      if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
+      if (node_->IsInt64()) return std::to_string(node_->GetInt64());
+      if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+      throw std::runtime_error("Cannot get string from non-integer json value");
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  std::string getDebugString() const override {
+    if (!node_) return "<invalid>";
+    if (node_->IsObject()) return "<Map>";
+    if (node_->IsArray()) return "<Array>";
+    if (node_->IsNull()) return "null";
+    if (auto int_str = getIntegerAsString()) {
+      return int_str.value();
+    }
+    if (node_->IsTrue()) return "true";
+    if (node_->IsFalse()) return "false";
+    if (node_->IsDouble()) return std::to_string(node_->GetDouble());
+    if (node_->IsString()) return '"' + std::string(node_->GetString(), node_->GetStringLength()) + '"';
+    return "<unknown>";
+  }
+
+  size_t size() const override {
+    if (!node_) {
+      throw std::runtime_error("Cannot get size of invalid json value");
+    }
+    if (!node_->IsArray()) {
+      throw std::runtime_error("Cannot get size of non-array json value");
+    }
+    return node_->Size();
+  }
+  flow::Node::Iterator begin() const override;
+  flow::Node::Iterator end() const override;
+
+  flow::Node operator[](std::string_view key) const override {
+    if (!node_ || node_->IsArray() || node_->IsNull()) {
+      return flow::Node{std::make_shared<JsonNode>(nullptr)};
+    }
+    if (!node_->IsObject()) {
+      throw std::runtime_error("Cannot get member of scalar json value");
+    }
+    auto it = node_->FindMember(rapidjson::Value(rapidjson::StringRef(key.data(), key.length())));
+    if (it == node_->MemberEnd()) {
+      return flow::Node{std::make_shared<JsonNode>(nullptr)};
+    }
+    return flow::Node{std::make_shared<JsonNode>(&it->value)};
+  }
+
+  std::optional<flow::Node::Cursor> getCursor() const override {
+    return std::nullopt;
+  }
+
+ private:
+  const rapidjson::Value* node_;
+};
+
+class JsonValueIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+  explicit JsonValueIterator(rapidjson::Value::ConstValueIterator it): it_(std::move(it)) {}
+
+  IteratorImpl& operator++() override {
+    ++it_;
+    return *this;
+  }
+  bool operator==(const IteratorImpl& other) const override {
+    const auto* ptr = dynamic_cast<const JsonValueIterator*>(&other);
+    gsl_Expects(ptr);
+    return it_ == ptr->it_;
+  }
+  flow::Node::Iterator::Value operator*() const override {
+    auto node = flow::Node{std::make_shared<JsonNode>(&*it_)};
+    auto first = flow::Node{std::make_shared<JsonNode>(nullptr)};
+    auto second = flow::Node{std::make_shared<JsonNode>(nullptr)};
+    return {std::move(node), std::move(first), std::move(second)};
+  }
+
+  std::unique_ptr<IteratorImpl> clone() const override {
+    return std::make_unique<JsonValueIterator>(it_);
+  }
+
+ private:
+  rapidjson::Value::ConstValueIterator it_;
+};
+
+class JsonMemberIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+  explicit JsonMemberIterator(rapidjson::Value::ConstMemberIterator it): it_(std::move(it)) {}
+
+  IteratorImpl& operator++() override {
+    ++it_;
+    return *this;
+  }
+  bool operator==(const IteratorImpl& other) const override {
+    const auto* ptr = dynamic_cast<const JsonMemberIterator*>(&other);
+    gsl_Expects(ptr);
+    return it_ == ptr->it_;
+  }
+  flow::Node::Iterator::Value operator*() const override {
+    auto node = flow::Node{std::make_shared<JsonNode>(nullptr)};
+    auto first = flow::Node{std::make_shared<JsonNode>(&it_->name)};
+    auto second = flow::Node{std::make_shared<JsonNode>(&it_->value)};
+    return flow::Node::Iterator::Value(node, first, second);
+  }
+
+  std::unique_ptr<IteratorImpl> clone() const override {
+    return std::make_unique<JsonMemberIterator>(it_);
+  }
+
+ private:
+  rapidjson::Value::ConstMemberIterator it_;
+};
+
+inline flow::Node::Iterator JsonNode::begin() const {
+  if (!node_) {
+    throw std::runtime_error("Cannot get begin of invalid json value");
+  }
+  if (node_->IsArray()) {
+    return flow::Node::Iterator{std::make_unique<JsonValueIterator>(node_->Begin())};
+  } else if (node_->IsObject()) {
+    return flow::Node::Iterator{std::make_unique<JsonMemberIterator>(node_->MemberBegin())};
+  } else {
+    throw std::runtime_error("Json node is not iterable, neither array nor object");
+  }
+}
+
+inline flow::Node::Iterator JsonNode::end() const {
+  if (!node_) {
+    throw std::runtime_error("Cannot get end of invalid json value");
+  }
+  if (node_->IsArray()) {
+    return flow::Node::Iterator{std::make_unique<JsonValueIterator>(node_->End())};
+  } else if (node_->IsObject()) {
+    return flow::Node::Iterator{std::make_unique<JsonMemberIterator>(node_->MemberEnd())};
+  } else {
+    throw std::runtime_error("Json node is not iterable, neither array nor object");
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index a8b7ab5c1..996608dcf 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #pragma once
 
 #include <memory>
@@ -32,35 +33,15 @@
 #include "utils/Id.h"
 #include "utils/StringUtils.h"
 #include "utils/file/FileSystem.h"
-#include "yaml-cpp/yaml.h"
+#include "core/flow/StructuredConfiguration.h"
 
 class YamlConfigurationTestAccessor;
 
 namespace org::apache::nifi::minifi::core {
 
-static constexpr char const* CONFIG_YAML_FLOW_CONTROLLER_KEY = "Flow Controller";
-static constexpr char const* CONFIG_YAML_PROCESSORS_KEY = "Processors";
-static constexpr char const* CONFIG_YAML_CONTROLLER_SERVICES_KEY = "Controller Services";
-static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
-static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
-static constexpr char const* CONFIG_YAML_PROVENANCE_REPORT_KEY = "Provenance Reporting";
-static constexpr char const* CONFIG_YAML_FUNNELS_KEY = "Funnels";
-static constexpr char const* CONFIG_YAML_INPUT_PORTS_KEY = "Input Ports";
-static constexpr char const* CONFIG_YAML_OUTPUT_PORTS_KEY = "Output Ports";
-
-#define YAML_CONFIGURATION_USE_REGEX
-
-// Disable regex in EL for incompatible compilers
-#if __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 9)
-#undef YAML_CONFIGURATION_USE_REGEX
-#endif
-
-class YamlConfiguration : public FlowConfiguration {
+class YamlConfiguration : public flow::StructuredConfiguration {
  public:
-  explicit YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
-                             const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
-                             const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path = {},
-                             const std::shared_ptr<utils::file::FileSystem>& filesystem = std::make_shared<utils::file::FileSystem>());
+  explicit YamlConfiguration(ConfigurationContext ctx);
 
   ~YamlConfiguration() override = default;
 
@@ -71,23 +52,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return               the root ProcessGroup node of the flow
    *                        configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRoot() override {
-    if (!config_path_) {
-      logger_->log_error("Cannot instantiate flow, no config file is set.");
-      throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
-    }
-    const auto configuration = filesystem_->read(config_path_.value());
-    if (!configuration) {
-      return nullptr;
-    }
-    try {
-      YAML::Node rootYamlNode = YAML::Load(configuration.value());
-      return getYamlRoot(rootYamlNode);
-    } catch(...) {
-      logger_->log_error("Invalid yaml configuration file");
-      throw;
-    }
-  }
+  std::unique_ptr<core::ProcessGroup> getRoot() override;
 
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
@@ -100,16 +65,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return                 the root ProcessGroup node of the flow
    *                           configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream) {
-    try {
-      YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
-      return getYamlRoot(rootYamlNode);
-    } catch (const YAML::ParserException &pe) {
-      logger_->log_error(pe.what());
-      std::rethrow_exception(std::current_exception());
-    }
-    return nullptr;
-  }
+  std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream);
 
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
@@ -122,210 +78,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return                 the root ProcessGroup node of the flow
    *                           configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override {
-    try {
-      YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
-      return getYamlRoot(rootYamlNode);
-    } catch (const YAML::ParserException &pe) {
-      logger_->log_error(pe.what());
-      std::rethrow_exception(std::current_exception());
-    }
-    return nullptr;
-  }
-
-  /**
-   * Iterates all component property validation rules and checks that configured state
-   * is valid. If state is determined to be invalid, conf parsing ends and an error is raised.
-   *
-   * @param component
-   * @param component_name
-   * @param yaml_section
-   */
-  void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const;
-
- protected:
-  /**
-   * Returns a shared pointer to a ProcessGroup object containing the
-   * flow configuration. The rootYamlNode argument must point to
-   * an YAML::Node object containing the root node of the parsed YAML
-   * for the flow configuration.
-   *
-   * @param rootYamlNode a pointer to a YAML::Node object containing the root
-   *                       node of the parsed YAML document
-   * @return             the root ProcessGroup node of the flow
-   *                       configuration tree
-   */
-  std::unique_ptr<core::ProcessGroup> getYamlRoot(const YAML::Node& rootYamlNode);
-
-  std::unique_ptr<core::ProcessGroup> createProcessGroup(const YAML::Node& yamlNode, bool is_root = false);
-
-  std::unique_ptr<core::ProcessGroup> parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root = false);
-  /**
-   * Parses a processor from its corresponding YAML config node and adds
-   * it to a parent ProcessGroup. The processorNode argument must point
-   * to a YAML::Node containing the processor configuration. A Processor
-   * object will be created a added to the parent ProcessGroup specified
-   * by the parent argument.
-   *
-   * @param processorsNode the YAML::Node containing the processor configuration
-   * @param parent         the parent ProcessGroup to which the the created
-   *                       Processor should be added
-   */
-  void parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parent);
-
-  /**
-   * Parses a port from its corressponding YAML config node and adds
-   * it to a parent ProcessGroup. The portNode argument must point
-   * to a YAML::Node containing the port configuration. A RemoteProcessorGroupPort
-   * object will be created a added to the parent ProcessGroup specified
-   * by the parent argument.
-   *
-   * @param portNode  the YAML::Node containing the port configuration
-   * @param parent    the parent ProcessGroup for the port
-   * @param direction the TransferDirection of the port
-   */
-  void parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
-
-  /**
-   * Parses the root level YAML node for the flow configuration and
-   * returns a ProcessGroup containing the tree of flow configuration
-   * objects.
-   *
-   * @param rootFlowNode
-   * @return
-   */
-  std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(const YAML::Node& rootFlowNode);
-
-  // Process Property YAML
-  void parseProcessorPropertyYaml(const YAML::Node& doc, const YAML::Node& node, std::shared_ptr<core::Processor> processor);
-  /**
-   * Parse controller services
-   * @param controllerServicesNode controller services YAML node.
-   * @param parent parent process group.
-   */
-  void parseControllerServices(const YAML::Node& controllerServicesNode);
-
-  /**
-   * Parses the Connections section of a configuration YAML.
-   * The resulting Connections are added to the parent ProcessGroup.
-   *
-   * @param node   the YAML::Node containing the Connections section
-   *                 of the configuration YAML
-   * @param parent the root node of flow configuration to which
-   *                 to add the connections that are parsed
-   */
-  void parseConnectionYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
-  /**
-   * Parses the Remote Process Group section of a configuration YAML.
-   * The resulting Process Group is added to the parent ProcessGroup.
-   *
-   * @param node   the YAML::Node containing the Remote Process Group
-   *                 section of the configuration YAML
-   * @param parent the root node of flow configuration to which
-   *                 to add the process groups that are parsed
-   */
-  void parseRemoteProcessGroupYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
-  /**
-   * Parses the Provenance Reporting section of a configuration YAML.
-   * The resulting Provenance Reporting processor is added to the
-   * parent ProcessGroup.
-   *
-   * @param reportNode  the YAML::Node containing the provenance
-   *                      reporting configuration
-   * @param parentGroup the root node of flow configuration to which
-   *                      to add the provenance reporting config
-   */
-  void parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup);
-
-  /**
-   * A helper function to parse the Properties Node YAML for a processor.
-   *
-   * @param propertiesNode the YAML::Node containing the properties
-   * @param processor      the Processor to which to add the resulting properties
-   */
-  void parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name, const std::string& yaml_section);
-
-  /**
-   * Parses the Funnels section of a configuration YAML.
-   * The resulting Funnels are added to the parent ProcessGroup.
-   *
-   * @param node   the YAML::Node containing the Funnels section
-   *                 of the configuration YAML
-   * @param parent the root node of flow configuration to which
-   *                 to add the funnels that are parsed
-   */
-  void parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
-  /**
-   * Parses the Input/Output Ports section of a configuration YAML.
-   * The resulting ports are added to the parent ProcessGroup.
-   *
-   * @param node   the YAML::Node containing the Input/Output Ports section
-   *                 of the configuration YAML
-   * @param parent the root node of flow configuration to which
-   *                 to add the funnels that are parsed
-   */
-  void parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type);
-
-  /**
-   * A helper function for parsing or generating optional id fields.
-   *
-   * In parsing YAML flow configurations for config schema v1, the
-   * 'id' field of most component types that contains a UUID is optional.
-   * This function will check for the existence of the specified
-   * idField in the specified yamlNode. If present, the field will be parsed
-   * as a UUID and the UUID string will be returned. If not present, a
-   * random UUID string will be generated and returned.
-   *
-   * @param yamlNode a pointer to the YAML::Node that will be checked for the
-   *                   presence of an idField
-   * @param idField  the string of the name of the idField to check for. This
-   *                   is optional and defaults to 'id'
-   * @return         the parsed or generated UUID string
-   */
-  std::string getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField = "id");
-  std::string getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section = "", std::string_view error_message = "");
-
-  /**
-   * This is a helper function for getting an optional value, if it exists.
-   * If it does not exist, returns the provided default value.
-   *
-   * @param yamlNode     the YAML node to check
-   * @param fieldName    the optional field key
-   * @param defaultValue the default value to use if field is not set
-   * @param yamlSection  [optional] the top level section of the YAML config
-   *                       for the yamlNode. This is used fpr generating a
-   *                       useful info message for troubleshooting.
-   * @param infoMessage  [optional] the info message string to use if
-   *                       the optional field is missing. If not provided,
-   *                       a default info message will be generated.
-   */
-  YAML::Node getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection = "", const std::string& infoMessage = "");
-
- protected:
-  std::shared_ptr<io::StreamFactory> stream_factory_;
-
- private:
-  PropertyValue getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode);
-  void parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component);
-  void parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
-  void parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
-  void addNewId(const std::string& uuid);
-
-  std::shared_ptr<logging::Logger> logger_;
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
-  std::unordered_set<std::string> uuids_;
-
-  /**
-   * Raises a human-readable configuration error for the given configuration component/section.
-   *
-   * @param component_name
-   * @param yaml_section
-   * @param reason
-   */
-  void raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const;
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h
new file mode 100644
index 000000000..ad8422f6e
--- /dev/null
+++ b/libminifi/include/core/yaml/YamlNode.h
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "yaml-cpp/yaml.h"
+#include "core/flow/Node.h"
+#include "utils/gsl.h"
+
+
+namespace org::apache::nifi::minifi::core {
+
+class YamlNode : public flow::Node::NodeImpl {
+ public:
+  explicit YamlNode(YAML::Node node) : node_(std::move(node)) {}
+
+  explicit operator bool() const override {
+    return node_.operator bool();
+  }
+
+  bool isSequence() const override {
+    return node_.IsSequence();
+  }
+
+  bool isMap() const override {
+    return node_.IsMap();
+  }
+
+  bool isNull() const override {
+    return node_.IsNull();
+  }
+
+  nonstd::expected<std::string, std::exception_ptr> getString() const override {
+    try {
+      return node_.as<std::string>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<bool, std::exception_ptr> getBool() const override {
+    try {
+      return node_.as<bool>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<int64_t, std::exception_ptr> getInt64() const override {
+    try {
+      return node_.as<int64_t>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const override {
+    try {
+      return node_.as<std::string>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
+  std::string getDebugString() const override {
+    if (!node_) return "<invalid>";
+    if (node_.IsNull()) return "null";
+    if (node_.IsSequence()) return "<Array>";
+    if (node_.IsMap()) return "<Map>";
+    if (node_.IsScalar()) return '"' + node_.Scalar() + '"';
+    return "<unknown>";
+  }
+
+  size_t size() const override {
+    return node_.size();
+  }
+
+  flow::Node::Iterator begin() const override;
+
+  flow::Node::Iterator end() const override;
+
+  flow::Node operator[](std::string_view key) const override {
+    return flow::Node{std::make_shared<YamlNode>(node_[std::string{key}])};
+  }
+
+  std::optional<flow::Node::Cursor> getCursor() const override {
+    YAML::Mark mark = node_.Mark();
+    if (mark.is_null()) {
+      return std::nullopt;
+    }
+    return flow::Node::Cursor{
+      .line = mark.line,
+      .column = mark.column,
+      .pos = mark.pos
+    };
+  }
+
+ private:
+  YAML::Node node_;
+};
+
+class YamlIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+  explicit YamlIterator(YAML::const_iterator it) : it_(std::move(it)) {}
+
+  IteratorImpl &operator++() override {
+    ++it_;
+    return *this;
+  }
+
+  bool operator==(const IteratorImpl &other) const override {
+    const auto *ptr = dynamic_cast<const YamlIterator *>(&other);
+    gsl_Expects(ptr);
+    return it_ == ptr->it_;
+  }
+
+  flow::Node::Iterator::Value operator*() const override {
+    auto val = *it_;
+    auto node = flow::Node{std::make_shared<YamlNode>(val)};
+    auto first = flow::Node{std::make_shared<YamlNode>(val.first)};
+    auto second = flow::Node{std::make_shared<YamlNode>(val.second)};
+    return flow::Node::Iterator::Value(node, first, second);
+  }
+
+  std::unique_ptr<IteratorImpl> clone() const override {
+    return std::make_unique<YamlIterator>(it_);
+  }
+
+ private:
+  YAML::const_iterator it_;
+};
+
+flow::Node::Iterator YamlNode::begin() const {
+  return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())};
+}
+
+flow::Node::Iterator YamlNode::end() const {
+  return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())};
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/utils/ValueCaster.h b/libminifi/include/utils/ValueCaster.h
index ad9dd5f0c..bc31b85c2 100644
--- a/libminifi/include/utils/ValueCaster.h
+++ b/libminifi/include/utils/ValueCaster.h
@@ -18,12 +18,7 @@
 
 #pragma once
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-namespace internal {
+namespace org::apache::nifi::minifi::utils::internal {
 
 template<class T, class U>
 bool cast_if_in_range(T in, U& out) {
@@ -37,9 +32,4 @@ bool cast_if_in_range(T in, U& out) {
   return true;
 }
 
-} /* namespace internal */
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::utils::internal
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index ea4c40346..81e2bb1e0 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -29,48 +29,50 @@
 #include "io/StreamFactory.h"
 
 #include "core/yaml/YamlConfiguration.h"
+#include "core/json/JsonConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
+
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const ConfigurationContext& ctx, const std::optional<std::string>& configuration_class_name, bool fail_safe) {
+  std::string class_name_lc;
+  if (configuration_class_name) {
+    class_name_lc = configuration_class_name.value();
+  } else if (ctx.path) {
+    if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) {
+      class_name_lc = "yamlconfiguration";
+    } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) {
+      class_name_lc = "jsonconfiguration";
+    } else {
+      throw std::runtime_error("Could not infer config type from file path");
+    }
+  } else {
+    throw std::runtime_error("Neither configuration class nor config file path has been specified");
+  }
 
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
-    std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
-    std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
-    std::shared_ptr<io::StreamFactory> stream_factory, const std::string& configuration_class_name,
-    const std::optional<std::string>& path, std::shared_ptr<utils::file::FileSystem> filesystem,
-    bool fail_safe) {
-  std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
   try {
     if (class_name_lc == "flowconfiguration") {
       // load the base configuration.
-      return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+      return std::make_unique<core::FlowConfiguration>(ctx);
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
-      return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(
-          repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem));
+      return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx));
+    } else if (class_name_lc == "jsonconfiguration") {
+      return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx));
     } else {
       if (fail_safe) {
-        return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+        return std::make_unique<core::FlowConfiguration>(ctx);
       } else {
         throw std::runtime_error("Support for the provided configuration class could not be found");
       }
     }
   } catch (const std::runtime_error &) {
     if (fail_safe) {
-      return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+      return std::make_unique<core::FlowConfiguration>(ctx);
     }
   }
 
   throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 6c773e2f2..c7981b86f 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -28,17 +28,13 @@
 
 namespace org::apache::nifi::minifi::core {
 
-FlowConfiguration::FlowConfiguration(
-    const std::shared_ptr<core::Repository>& /*repo*/, std::shared_ptr<core::Repository> flow_file_repo,
-    std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
-    std::shared_ptr<Configure> configuration, const std::optional<std::filesystem::path>& path,
-    std::shared_ptr<utils::file::FileSystem> filesystem)
+FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
     : CoreComponent(core::getClassName<FlowConfiguration>()),
-      flow_file_repo_(std::move(flow_file_repo)),
-      content_repo_(std::move(content_repo)),
-      stream_factory_(std::move(stream_factory)),
-      configuration_(std::move(configuration)),
-      filesystem_(std::move(filesystem)),
+      flow_file_repo_(std::move(ctx.flow_file_repo)),
+      content_repo_(std::move(ctx.content_repo)),
+      stream_factory_(std::move(ctx.stream_factory)),
+      configuration_(std::move(ctx.configuration)),
+      filesystem_(std::move(ctx.filesystem)),
       logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
   controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
   service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
@@ -49,13 +45,13 @@ FlowConfiguration::FlowConfiguration(
   configuration_->get(Configure::nifi_c2_flow_url, flowUrl);
   flow_version_ = std::make_shared<state::response::FlowVersion>(flowUrl, bucket_id, flowId);
 
-  if (!path) {
+  if (!ctx.path) {
     logger_->log_error("Configuration path is not specified.");
   } else {
-    config_path_ = utils::file::canonicalize(*path);
+    config_path_ = utils::file::canonicalize(*ctx.path);
     if (!config_path_) {
-      logger_->log_error("Couldn't find config file \"%s\".", path->string());
-      config_path_ = path;
+      logger_->log_error("Couldn't find config file \"%s\".", ctx.path->string());
+      config_path_ = ctx.path;
     }
     checksum_calculator_.setFileLocation(*config_path_);
   }
diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp
new file mode 100644
index 000000000..6942a5844
--- /dev/null
+++ b/libminifi/src/core/flow/CheckRequiredField.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+
+#include "core/flow/CheckRequiredField.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+bool isFieldPresent(const Node &node, std::string_view field_name) {
+  return bool{node[field_name]};
+}
+
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) {
+  const Node name_node = node["name"];
+  // Build a helpful error message for the user so they can fix the
+  // invalid config file, using the component name if present
+  auto field_list_string = utils::StringUtils::join(", ", alternate_field_names);
+  std::string err_msg =
+      name_node ?
+          "Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" :
+          "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
+  if (!section.empty()) {
+    err_msg += " [in '" + std::string(section) + "' section of configuration file]";
+  }
+  if (auto cursor = node.getCursor()) {
+    err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]";
+  }
+  return err_msg;
+}
+
+void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) {
+  if (!isFieldPresent(node, field_name)) {
+    if (error_message.empty()) {
+      throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section));
+    }
+    throw std::invalid_argument(error_message.data());
+  }
+}
+
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) {
+  for (const auto& name : alternate_names) {
+    if (isFieldPresent(node, name)) {
+      return node[name].getString().value();
+    }
+  }
+  if (error_message.empty()) {
+    throw std::invalid_argument(buildErrorMessage(node, alternate_names, section));
+  }
+  throw std::invalid_argument(error_message.data());
+}
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/Defaults.h b/libminifi/src/core/flow/Node.cpp
similarity index 59%
copy from libminifi/include/Defaults.h
copy to libminifi/src/core/flow/Node.cpp
index 0ee8eaa48..7fd7b0df8 100644
--- a/libminifi/include/Defaults.h
+++ b/libminifi/src/core/flow/Node.cpp
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,10 +16,12 @@
  * limitations under the License.
  */
 
-#pragma once
+#include "core/flow/Node.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+Node::Iterator::Value Node::Iterator::operator*() const {
+  return impl_->operator*();
+}
 
-const std::filesystem::path DEFAULT_NIFI_CONFIG_YML = std::filesystem::path("conf") / "config.yml";
-const std::filesystem::path DEFAULT_NIFI_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi.properties";
-const std::filesystem::path DEFAULT_LOG_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-log.properties";
-const std::filesystem::path DEFAULT_UID_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-uid.properties";
-const std::filesystem::path DEFAULT_BOOTSTRAP_FILE = std::filesystem::path("conf") / "bootstrap.conf";
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
similarity index 51%
copy from libminifi/src/core/yaml/YamlConfiguration.cpp
copy to libminifi/src/core/flow/StructuredConfiguration.cpp
index f69f73de4..7e1d41e63 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -21,50 +21,42 @@
 #include <set>
 #include <cinttypes>
 
-#include "core/yaml/YamlConfiguration.h"
-#include "core/yaml/CheckRequiredField.h"
-#include "core/yaml/YamlConnectionParser.h"
+#include "core/flow/StructuredConfiguration.h"
+#include "core/flow/CheckRequiredField.h"
+#include "core/flow/StructuredConnectionParser.h"
 #include "core/state/Value.h"
 #include "Defaults.h"
 #include "utils/TimeUtil.h"
+#include "utils/RegexUtils.h"
 #include "Funnel.h"
 
-#ifdef YAML_CONFIGURATION_USE_REGEX
-#include "utils/RegexUtils.h"
-#endif  // YAML_CONFIGURATION_USE_REGEX
-
-namespace org::apache::nifi::minifi::core {
-
-std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
-                                     const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
-                                     const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path,
-                                     const std::shared_ptr<utils::file::FileSystem>& filesystem)
-    : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration,
-                        path.value_or(DEFAULT_NIFI_CONFIG_YML), filesystem),
-      stream_factory_(stream_factory),
-      logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
-  auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
-  auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
-  this->name_ = rootGroup->getName();
-  return rootGroup;
+namespace org::apache::nifi::minifi::core::flow {
+
+std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger)
+    : FlowConfiguration(std::move(ctx)),
+      logger_(std::move(logger)) {}
+
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) {
+  auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY];
+  auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true);
+  this->name_ = root_group->getName();
+  return root_group;
 }
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) {
   int version = 0;
 
-  yaml::checkRequiredField(yamlNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-  auto flowName = yamlNode["name"].as<std::string>();
+  checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+  auto flowName = node["name"].getString().value();
 
   utils::Identifier uuid;
   // assignment throws on invalid uuid
-  uuid = getOrGenerateId(yamlNode);
+  uuid = getOrGenerateId(node);
 
-  if (yamlNode["version"]) {
-    version = yamlNode["version"].as<int>();
+  if (node["version"]) {
+    version = gsl::narrow<int>(node["version"].getInt64().value());
   }
 
   logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
@@ -75,8 +67,8 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const
     group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
   }
 
-  if (yamlNode["onschedule retry interval"]) {
-    auto onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
+  if (node["onschedule retry interval"]) {
+    auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value();
     logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
 
     auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
@@ -89,51 +81,49 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const
   return group;
 }
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
-  auto group = createProcessGroup(headerNode, is_root);
-  YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
-  YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
-  YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
-  YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
-  YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
-  YAML::Node remoteProcessingGroupsNode = [&] {
-    // assignment is not supported on invalid Yaml nodes
-    YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) {
+  auto group = createProcessGroup(header_node, is_root);
+  Node processorsNode = node[CONFIG_PROCESSORS_KEY];
+  Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY];
+  Node funnelsNode = node[CONFIG_FUNNELS_KEY];
+  Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY];
+  Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY];
+  Node remoteProcessingGroupsNode = [&] {
+    // assignment is not supported on invalid nodes
+    Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY];
     if (candidate) {
       return candidate;
     }
-    return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+    return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3];
   }();
-  YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
+  Node childProcessGroupNodeSeq = node["Process Groups"];
 
-  parseProcessorNodeYaml(processorsNode, group.get());
-  parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
-  parseFunnelsYaml(funnelsNode, group.get());
+  parseProcessorNode(processorsNode, group.get());
+  parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get());
+  parseFunnels(funnelsNode, group.get());
   parsePorts(inputPortsNode, group.get(), PortType::INPUT);
   parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
 
-  if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
-    for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
-      auto childProcessGroupNode = it->as<YAML::Node>();
-      group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
+  if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.isSequence()) {
+    for (const auto& childProcessGroupNode : childProcessGroupNodeSeq) {
+      group->addProcessGroup(parseProcessGroup(childProcessGroupNode, childProcessGroupNode));
     }
   }
-
   // parse connections last to give feedback if the source and/or destination processors
   // is not in the same process group or input/output port connections are not allowed
-  parseConnectionYaml(connectionsNode, group.get());
+  parseConnection(connectionsNode, group.get());
   return group;
 }
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) {
   uuids_.clear();
-  YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
-  YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+  Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY];
+  Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY];
 
   parseControllerServices(controllerServiceNode);
   // Create the root process group
-  std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
-  parseProvenanceReportingYaml(provenanceReportNode, root.get());
+  std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroup(root_node);
+  parseProvenanceReporting(provenanceReportNode, root.get());
 
   // set the controller services into the root group.
   for (const auto& controller_service : controller_services_->getAllControllerServices()) {
@@ -146,36 +136,35 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::N
   return root;
 }
 
-void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) {
   int64_t runDurationNanos = -1;
   utils::Identifier uuid;
   std::unique_ptr<core::Processor> processor;
 
   if (!parentGroup) {
-    logger_->log_error("parseProcessNodeYaml: no parent group exists");
+    logger_->log_error("parseProcessNode: no parent group exists");
     return;
   }
 
-  if (!processorsNode) {
+  if (!processors_node) {
     throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
   }
-  if (!processorsNode.IsSequence()) {
+  if (!processors_node.isSequence()) {
     throw std::invalid_argument(
         "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
   }
   // Evaluate sequence of processors
-  for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+  for (const auto& procNode : processors_node) {
     core::ProcessorConfig procCfg;
-    const auto procNode = iter->as<YAML::Node>();
 
-    yaml::checkRequiredField(procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.name = procNode["name"].as<std::string>();
+    checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY);
+    procCfg.name = procNode["name"].getString().value();
     procCfg.id = getOrGenerateId(procNode);
 
     uuid = procCfg.id;
     logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
-    yaml::checkRequiredField(procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.javaClass = procNode["class"].as<std::string>();
+    checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY);
+    procCfg.javaClass = procNode["class"].getString().value();
     logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
 
     // Determine the processor name only from the Java class
@@ -198,45 +187,40 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
 
     processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
 
-    auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
-    CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.schedulingStrategy = strategyNode.as<std::string>();
+    procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY);
     logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
-    auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
-    CONFIG_YAML_PROCESSORS_KEY);
+    procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY);
 
-    procCfg.schedulingPeriod = periodNode.as<std::string>();
     logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
-    if (procNode["max concurrent tasks"]) {
-      procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+    if (auto tasksNode = procNode["max concurrent tasks"]) {
+      procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
     }
 
     if (procNode["penalization period"]) {
-      procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+      procCfg.penalizationPeriod = procNode["penalization period"].getString().value();
       logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
     }
 
     if (procNode["yield period"]) {
-      procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+      procCfg.yieldPeriod = procNode["yield period"].getString().value();
       logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
     }
 
-    if (procNode["run duration nanos"]) {
-      procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
+    if (auto runNode = procNode["run duration nanos"]) {
+      procCfg.runDurationNanos = runNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
     }
 
     // handle auto-terminated relationships
     if (procNode["auto-terminated relationships list"]) {
-      YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+      Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
       std::vector<std::string> rawAutoTerminatedRelationshipValues;
-      if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
-        for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
-          auto autoTerminatedRel = relIter->as<std::string>();
-          rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+      if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
+        for (const auto& autoTerminatedRel : autoTerminatedSequence) {
+          rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value());
         }
       }
       procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
@@ -244,8 +228,8 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
 
     // handle processor properties
     if (procNode["Properties"]) {
-      YAML::Node propertiesNode = procNode["Properties"];
-      parsePropertiesNodeYaml(propertiesNode, *processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+      Node propertiesNode = procNode["Properties"];
+      parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY);
     }
 
     // Take care of scheduling
@@ -307,89 +291,85 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
   }
 }
 
-void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) {
   utils::Identifier uuid;
   std::string id;
 
   if (!parentGroup) {
-    logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+    logger_->log_error("parseRemoteProcessGroup: no parent group exists");
     return;
   }
 
-  if (!rpgNode || !rpgNode.IsSequence()) {
+  if (!rpg_node_seq || !rpg_node_seq.isSequence()) {
     return;
   }
-  for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
-    auto currRpgNode = iter->as<YAML::Node>();
-
-    yaml::checkRequiredField(currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-    auto name = currRpgNode["name"].as<std::string>();
+  for (const auto& currRpgNode : rpg_node_seq) {
+    checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+    auto name = currRpgNode["name"].getString().value();
     id = getOrGenerateId(currRpgNode);
 
-    logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+    logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
 
-    auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
-    CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+    auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY);
 
-    auto url = urlNode.as<std::string>();
-    logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
+    logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
 
     uuid = id;
-    auto group = this->createRemoteProcessGroup(name, uuid);
+    auto group = createRemoteProcessGroup(name, uuid);
     group->setParent(parentGroup);
 
     if (currRpgNode["yield period"]) {
-      auto yieldPeriod = currRpgNode["yield period"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+      auto yieldPeriod = currRpgNode["yield period"].getString().value();
+      logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
 
       auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
       if (yield_period_value.has_value() && group) {
-        logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
+        logger_->log_debug("parseRemoteProcessGroup: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
         group->setYieldPeriodMsec(*yield_period_value);
       }
     }
 
     if (currRpgNode["timeout"]) {
-      auto timeout = currRpgNode["timeout"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+      auto timeout = currRpgNode["timeout"].getString().value();
+      logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
 
       auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
       if (timeout_value.has_value() && group) {
-        logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
+        logger_->log_debug("parseRemoteProcessGroup: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
         group->setTimeout(timeout_value->count());
       }
     }
 
     if (currRpgNode["local network interface"]) {
-      auto interface = currRpgNode["local network interface"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
+      auto interface = currRpgNode["local network interface"].getString().value();
+      logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
       group->setInterface(interface);
     }
 
     if (currRpgNode["transport protocol"]) {
-      auto transport_protocol = currRpgNode["transport protocol"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
+      auto transport_protocol = currRpgNode["transport protocol"].getString().value();
+      logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
       if (transport_protocol == "HTTP") {
         group->setTransportProtocol(transport_protocol);
         if (currRpgNode["proxy host"]) {
-          auto http_proxy_host = currRpgNode["proxy host"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
+          auto http_proxy_host = currRpgNode["proxy host"].getString().value();
+          logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
           group->setHttpProxyHost(http_proxy_host);
           if (currRpgNode["proxy user"]) {
-            auto http_proxy_username = currRpgNode["proxy user"].as<std::string>();
-            logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
+            auto http_proxy_username = currRpgNode["proxy user"].getString().value();
+            logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
             group->setHttpProxyUserName(http_proxy_username);
           }
           if (currRpgNode["proxy password"]) {
-            auto http_proxy_password = currRpgNode["proxy password"].as<std::string>();
-            logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
+            auto http_proxy_password = currRpgNode["proxy password"].getString().value();
+            logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
             group->setHttpProxyPassWord(http_proxy_password);
           }
           if (currRpgNode["proxy port"]) {
-            auto http_proxy_port = currRpgNode["proxy port"].as<std::string>();
+            auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value();
             int32_t port;
             if (core::Property::StringToInt(http_proxy_port, port)) {
-              logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
+              logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
               group->setHttpProxyPort(port);
             }
           }
@@ -406,50 +386,44 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
     group->setTransmitting(true);
     group->setURL(url);
 
-    yaml::checkRequiredField(currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-    auto inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
-    if (inputPorts && inputPorts.IsSequence()) {
-      for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-        auto currPort = portIter->as<YAML::Node>();
-
-        this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
+    checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+    auto inputPorts = currRpgNode["Input Ports"];
+    if (inputPorts && inputPorts.isSequence()) {
+      for (const auto& currPort : inputPorts) {
+        parsePort(currPort, group.get(), sitetosite::SEND);
       }  // for node
     }
-    auto outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
-    if (outputPorts && outputPorts.IsSequence()) {
-      for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
+    auto outputPorts = currRpgNode["Output Ports"];
+    if (outputPorts && outputPorts.isSequence()) {
+      for (const auto& currPort : outputPorts) {
         logger_->log_debug("Got a current port, iterating...");
 
-        auto currPort = portIter->as<YAML::Node>();
-
-        this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
+        parsePort(currPort, group.get(), sitetosite::RECEIVE);
       }  // for node
     }
     parentGroup->addProcessGroup(std::move(group));
   }
 }
 
-void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::ProcessGroup* parent_group) {
   utils::Identifier port_uuid;
 
-  if (!parentGroup) {
-    logger_->log_error("parseProvenanceReportingYaml: no parent group exists");
+  if (!parent_group) {
+    logger_->log_error("parseProvenanceReporting: no parent group exists");
     return;
   }
 
-  if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
+  if (!node || node.isNull()) {
     logger_->log_debug("no provenance reporting task specified");
     return;
   }
 
   auto reportTask = createProvenanceReportTask();
 
-  const auto node = reportNode.as<YAML::Node>();
-
-  yaml::checkRequiredField(node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
-  yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
+  checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY);
+  auto schedulingStrategyStr = node["scheduling strategy"].getString().value();
+  checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY);
+  auto schedulingPeriodStr = node["scheduling period"].getString().value();
 
   if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
     logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
@@ -465,9 +439,9 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
 
   int64_t lvalue;
   if (node["host"] && node["port"]) {
-    auto hostStr = node["host"].as<std::string>();
+    auto hostStr = node["host"].getString().value();
 
-    auto portStr = node["port"].as<std::string>();
+    std::string portStr = node["port"].getIntegerAsString().value();
     if (core::Property::StringToInt(portStr, lvalue) && !hostStr.empty()) {
       logger_->log_debug("ProvenanceReportingTask port %" PRId64, lvalue);
       std::string url = hostStr + ":" + portStr;
@@ -476,16 +450,16 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
   }
 
   if (node["url"]) {
-    auto urlStr = node["url"].as<std::string>();
+    auto urlStr = node["url"].getString().value();
     if (!urlStr.empty()) {
       reportTask->setURL(urlStr);
       logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
     }
   }
-  yaml::checkRequiredField(node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto portUUIDStr = node["port uuid"].as<std::string>();
-  yaml::checkRequiredField(node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto batchSizeStr = node["batch size"].as<std::string>();
+  checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY);
+  auto portUUIDStr = node["port uuid"].getString().value();
+  checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY);
+  auto batchSizeStr = node["batch size"].getString().value();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   port_uuid = portUUIDStr;
@@ -499,72 +473,72 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
 
   // add processor to parent
   reportTask->setScheduledState(core::RUNNING);
-  parentGroup->addProcessor(std::move(reportTask));
+  parent_group->addProcessor(std::move(reportTask));
 }
 
-void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
-  if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
+void StructuredConfiguration::parseControllerServices(const Node& controller_services_node) {
+  if (!controller_services_node || !controller_services_node.isSequence()) {
     return;
   }
-  for (const auto& iter : controllerServicesNode) {
-    const auto controllerServiceNode = iter.as<YAML::Node>();
-    try {
-      yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+  for (const auto& service_node : controller_services_node) {
+    checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY);
 
-      auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-      logger_->log_debug("Using type %s for controller service node", type);
+    auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY);
+    logger_->log_debug("Using type %s for controller service node", type);
 
-      std::string fullType = type;
-      auto lastOfIdx = type.find_last_of('.');
-      if (lastOfIdx != std::string::npos) {
-        lastOfIdx++;  // if a value is found, increment to move beyond the .
-        type = type.substr(lastOfIdx);
-      }
+    std::string fullType = type;
+    auto lastOfIdx = type.find_last_of('.');
+    if (lastOfIdx != std::string::npos) {
+      lastOfIdx++;  // if a value is found, increment to move beyond the .
+      type = type.substr(lastOfIdx);
+    }
 
-      auto name = controllerServiceNode["name"].as<std::string>();
-      auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+    auto name = service_node["name"].getString().value();
+    auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY);
 
-      utils::Identifier uuid;
-      uuid = id;
-      std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
-      if (nullptr != controller_service_node) {
-        logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
-        controller_service_node->initialize();
-        YAML::Node propertiesNode = controllerServiceNode["Properties"];
+    utils::Identifier uuid;
+    uuid = id;
+    std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
+    if (nullptr != controller_service_node) {
+      logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
+      controller_service_node->initialize();
+      if (Node propertiesNode = service_node["Properties"]) {
         // we should propagate properties to the node and to the implementation
-        parsePropertiesNodeYaml(propertiesNode, *controller_service_node, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+        parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY);
         if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
-          parsePropertiesNodeYaml(propertiesNode, *controllerServiceImpl, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+          parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY);
         }
-      } else {
-        logger_->log_debug("Could not locate %s", type);
       }
-      controller_services_->put(id, controller_service_node);
-      controller_services_->put(name, controller_service_node);
-    } catch (YAML::InvalidNode &) {
-      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
+    } else {
+      logger_->log_debug("Could not locate %s", type);
     }
+    controller_services_->put(id, controller_service_node);
+    controller_services_->put(name, controller_service_node);
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
+void StructuredConfiguration::parseConnection(const Node& connection_node_seq, core::ProcessGroup* parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
   }
-  if (!connectionsNode || !connectionsNode.IsSequence()) {
+  if (!connection_node_seq || !connection_node_seq.isSequence()) {
     return;
   }
 
-  for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
-    const auto connectionNode = iter->as<YAML::Node>();
-
+  for (const auto& connection_node : connection_node_seq) {
+    // for backwards compatibility we ignore invalid connection_nodes instead of throwing
+    // previously the ConnectionParser created an unreachable connection in this case
+    if (!connection_node || !connection_node.isMap()) {
+      logger_->log_error("Invalid connection node, ignoring");
+      continue;
+    }
     // Configure basic connection
-    const std::string id = getOrGenerateId(connectionNode);
+    const std::string id = getOrGenerateId(connection_node);
 
     // Default name to be same as ID
     // If name is specified in configuration, use the value
-    const auto name = connectionNode["name"].as<std::string>(id);
+    const auto name = connection_node["name"].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect connection UUID format.");
@@ -573,21 +547,21 @@ void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, c
 
     auto connection = createConnection(name, uuid.value());
     logger_->log_debug("Created connection with UUID %s and name %s", id, name);
-    const yaml::YamlConnectionParser connectionParser(connectionNode, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
-    connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection);
-    connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
-    connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
-    connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml());
-    connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
-    connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
-    connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
-    connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
+    const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+    connectionParser.configureConnectionSourceRelationships(*connection);
+    connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
+    connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
+    connection->setSwapThreshold(connectionParser.getSwapThreshold());
+    connection->setSourceUUID(connectionParser.getSourceUUID());
+    connection->setDestinationUUID(connectionParser.getDestinationUUID());
+    connection->setFlowExpirationDuration(connectionParser.getFlowFileExpiration());
+    connection->setDropEmptyFlowFiles(connectionParser.getDropEmpty());
 
     parent->addConnection(std::move(connection));
   }
 }
 
-void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
+void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
   utils::Identifier uuid;
 
   if (!parent) {
@@ -595,14 +569,12 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
     return;
   }
 
-  const auto inputPortsObj = portNode.as<YAML::Node>();
-
   // Check for required fields
-  yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-  auto nameStr = inputPortsObj["name"].as<std::string>();
-  auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
+  checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+  auto nameStr = port_node["name"].getString().value();
+  auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY,
     "The field 'id' is required for "
-    "the port named '" + nameStr + "' in the YAML Config. If this port "
+    "the port named '" + nameStr + "' in the Flow Config. If this port "
     "is an input port for a NiFi Remote Process Group, the port "
     "id should match the corresponding id specified in the NiFi configuration. "
     "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
@@ -625,17 +597,17 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
   // else defaults to RAW
 
   // handle port properties
-  const auto nodeVal = portNode.as<YAML::Node>();
-  YAML::Node propertiesNode = nodeVal["Properties"];
-  parsePropertiesNodeYaml(propertiesNode, *port, nameStr, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  if (Node propertiesNode = port_node["Properties"]) {
+    parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY);
+  }
 
   // add processor to parent
   auto& processor = *port;
   parent->addProcessor(std::move(port));
   processor.setScheduledState(core::RUNNING);
 
-  if (inputPortsObj["max concurrent tasks"]) {
-    auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+  if (auto tasksNode = port_node["max concurrent tasks"]) {
+    std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value();
     int32_t maxConcurrentTasks;
     if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
       processor.setMaxConcurrentTasks(maxConcurrentTasks);
@@ -645,22 +617,21 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
   }
 }
 
-void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component) {
-  for (const auto& iter : propertyValueNode) {
-    if (iter.IsDefined()) {
-      const auto nodeVal = iter.as<YAML::Node>();
-      YAML::Node propertiesNode = nodeVal["value"];
+void StructuredConfiguration::parsePropertyValueSequence(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& component) {
+  for (const auto& nodeVal : property_value_node) {
+    if (nodeVal) {
+      Node propertiesNode = nodeVal["value"];
       // must insert the sequence in differently.
-      const auto rawValueString = propertiesNode.as<std::string>();
-      logger_->log_debug("Found %s=%s", propertyName, rawValueString);
-      if (!component.updateProperty(propertyName, rawValueString)) {
+      const auto rawValueString = propertiesNode.getString().value();
+      logger_->log_debug("Found %s=%s", property_name, rawValueString);
+      if (!component.updateProperty(property_name, rawValueString)) {
         auto proc = dynamic_cast<core::Connectable*>(&component);
         if (proc) {
-          logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
-          if (!component.setDynamicProperty(propertyName, rawValueString)) {
-            logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
+          logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
+          if (!component.setDynamicProperty(property_name, rawValueString)) {
+            logger_->log_warn("Unable to set the dynamic property %s with value %s", property_name, rawValueString);
           } else {
-            logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
+            logger_->log_warn("Dynamic property %s with value %s set", property_name, rawValueString);
           }
         }
       }
@@ -668,118 +639,114 @@ void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyNa
   }
 }
 
-PropertyValue YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode) {
+PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& property_from_processor, const Node& property_value_node) {
+  using state::response::Value;
   PropertyValue defaultValue;
-  defaultValue = propertyFromProcessor.getDefaultValue();
+  defaultValue = property_from_processor.getDefaultValue();
   const std::type_index defaultType = defaultValue.getTypeInfo();
   try {
     PropertyValue coercedValue = defaultValue;
-    if (defaultType == typeid(int64_t)) {
-      coercedValue = propertyValueNode.as<int64_t>();
-    } else if (defaultType == typeid(uint64_t)) {
-      uint64_t integer_value;
-      if (YAML::convert<uint64_t>::decode(propertyValueNode, integer_value)) {
-        coercedValue = integer_value;
-      } else {
-        coercedValue = propertyValueNode.as<std::string>();
-      }
-    } else if (defaultType == typeid(int)) {
-      coercedValue = propertyValueNode.as<int>();
-    } else if (defaultType == typeid(bool)) {
-      coercedValue = propertyValueNode.as<bool>();
+    auto int64_val = property_value_node.getInt64();
+    if (defaultType == Value::INT64_TYPE && int64_val) {
+      coercedValue = gsl::narrow<int64_t>(int64_val.value());
+    } else if (defaultType == Value::UINT64_TYPE && int64_val) {
+      coercedValue = gsl::narrow<uint64_t>(int64_val.value());
+    } else if (defaultType == Value::UINT32_TYPE && int64_val) {
+      coercedValue = gsl::narrow<uint32_t>(int64_val.value());
+    } else if (defaultType == Value::INT_TYPE && int64_val) {
+      coercedValue = gsl::narrow<int>(int64_val.value());
+    } else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) {
+      coercedValue = property_value_node.getBool().value();
     } else {
-      coercedValue = propertyValueNode.as<std::string>();
+      coercedValue = property_value_node.getString().value();
     }
     return coercedValue;
   } catch (const std::exception& e) {
     logger_->log_error("Fetching property failed with an exception of %s", e.what());
-    logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+    logger_->log_error("Invalid conversion for field %s. Value %s", property_from_processor.getName(), property_value_node.getDebugString());
   } catch (...) {
-    logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+    logger_->log_error("Invalid conversion for field %s. Value %s", property_from_processor.getName(), property_value_node.getDebugString());
   }
   return defaultValue;
 }
 
-void YamlConfiguration::parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
-  core::Property myProp(propertyName, "", "");
-  processor.getProperty(propertyName, myProp);
-  const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode);
+void StructuredConfiguration::parseSingleProperty(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor) {
+  core::Property myProp(property_name, "", "");
+  processor.getProperty(property_name, myProp);
+  const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, property_value_node);
   bool property_set = false;
   try {
     property_set = processor.setProperty(myProp, coercedValue);
   } catch(const utils::internal::InvalidValueException&) {
     auto component = dynamic_cast<core::CoreComponent*>(&processor);
     if (component == nullptr) {
-      logger_->log_error("processor was not a CoreComponent for property '%s'", propertyName);
+      logger_->log_error("processor was not a CoreComponent for property '%s'", property_name);
     } else {
-      logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName());
+      logger_->log_error("Invalid value was set for property '%s' creating component '%s'", property_name, component->getName());
     }
     throw;
   }
-  const auto rawValueString = propertyValueNode.as<std::string>();
   if (!property_set) {
+    const auto rawValueString = property_value_node.getString().value();
     auto proc = dynamic_cast<core::Connectable*>(&processor);
     if (proc) {
-      logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
-      if (!processor.setDynamicProperty(propertyName, rawValueString)) {
-        logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
+      logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
+      if (!processor.setDynamicProperty(property_name, rawValueString)) {
+        logger_->log_warn("Unable to set the dynamic property %s with value %s", property_name, rawValueString);
       } else {
-        logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
+        logger_->log_warn("Dynamic property %s with value %s set", property_name, rawValueString);
       }
     }
   } else {
-    logger_->log_debug("Property %s with value %s set", propertyName, rawValueString);
+    logger_->log_debug("Property %s with value %s set", property_name, coercedValue.to_string());
   }
 }
 
-void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
-  logger_->log_trace("Encountered %s", propertyName);
-  if (propertyValueNode.IsNull() || !propertyValueNode.IsDefined()) {
+void StructuredConfiguration::parsePropertyNodeElement(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor) {
+  logger_->log_trace("Encountered %s", property_name);
+  if (!property_value_node || property_value_node.isNull()) {
     return;
   }
-  if (propertyValueNode.IsSequence()) {
-    parsePropertyValueSequence(propertyName, propertyValueNode, processor);
+  if (property_value_node.isSequence()) {
+    parsePropertyValueSequence(property_name, property_value_node, processor);
   } else {
-    parseSingleProperty(propertyName, propertyValueNode, processor);
+    parseSingleProperty(property_name, property_value_node, processor);
   }
 }
 
-void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name,
-    const std::string& yaml_section) {
-  // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
+void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) {
+  // Treat generically as a node so we can perform inspection on entries to ensure they are populated
   logger_->log_trace("Entered %s", component_name);
-  for (const auto& propertyElem : propertiesNode) {
-    const auto propertyName = propertyElem.first.as<std::string>();
-    const YAML::Node propertyValueNode = propertyElem.second;
+  for (const auto& property_node : properties_node) {
+    const auto propertyName = property_node.first.getString().value();
+    const Node propertyValueNode = property_node.second;
     parsePropertyNodeElement(propertyName, propertyValueNode, component);
   }
 
-  validateComponentProperties(component, component_name, yaml_section);
+  validateComponentProperties(component, component_name, section);
 }
 
-void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent) {
+void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) {
   if (!parent) {
-    logger_->log_error("parseFunnelsYaml: no parent group was provided");
+    logger_->log_error("parseFunnels: no parent group was provided");
     return;
   }
-  if (!node || !node.IsSequence()) {
+  if (!node || !node.isSequence()) {
     return;
   }
 
-  for (const auto& element : node) {
-    const auto funnel_node = element.as<YAML::Node>();
-
+  for (const auto& funnel_node : node) {
     std::string id = getOrGenerateId(funnel_node);
 
     // Default name to be same as ID
-    const auto name = funnel_node["name"].as<std::string>(id);
+    const auto name = funnel_node["name"].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect funnel UUID format.");
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<minifi::Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
@@ -787,22 +754,20 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
   }
 }
 
-void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGroup* parent, PortType port_type) {
   if (!parent) {
     logger_->log_error("parsePorts: no parent group was provided");
     return;
   }
-  if (!node || !node.IsSequence()) {
+  if (!node || !node.isSequence()) {
     return;
   }
 
-  for (const auto& element : node) {
-    const auto port_node = element.as<YAML::Node>();
-
+  for (const auto& port_node : node) {
     std::string id = getOrGenerateId(port_node);
 
     // Default name to be same as ID
-    const auto name = port_node["name"].as<std::string>(id);
+    const auto name = port_node["name"].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect port UUID format.");
@@ -817,7 +782,8 @@ void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* p
   }
 }
 
-void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
+
+void StructuredConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &section) const {
   const auto &component_properties = component.getProperties();
 
   // Validate required properties
@@ -825,10 +791,10 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
     if (prop_pair.second.getRequired()) {
       if (prop_pair.second.getValue().to_string().empty()) {
         std::string reason = utils::StringUtils::join_pack("required property '", prop_pair.second.getName(), "' is not set");
-        raiseComponentError(component_name, yaml_section, reason);
+        raiseComponentError(component_name, section, reason);
       } else if (!prop_pair.second.getValue().validate(prop_pair.first).valid()) {
         std::string reason = utils::StringUtils::join_pack("the value '", prop_pair.first, "' is not valid for property '", prop_pair.second.getName(), "'");
-        raiseComponentError(component_name, yaml_section, reason);
+        raiseComponentError(component_name, section, reason);
       }
     }
   }
@@ -845,12 +811,11 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
       if (component_properties.at(dep_prop_key).getValue().to_string().empty()) {
         std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
             "' depends on property '", dep_prop_key, "' which is not set");
-        raiseComponentError(component_name, yaml_section, reason);
+        raiseComponentError(component_name, section, reason);
       }
     }
   }
 
-#ifdef YAML_CONFIGURATION_USE_REGEX
   // Validate mutually-exclusive properties
   for (const auto &prop_pair : component_properties) {
     const auto &excl_props = prop_pair.second.getExclusiveOfProperties();
@@ -864,7 +829,7 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
       if (utils::regexMatch(component_properties.at(excl_pair.first).getValue().to_string(), excl_expr)) {
         std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
             "' must not be set when the value of property '", excl_pair.first, "' matches '", excl_pair.second, "'");
-        raiseComponentError(component_name, yaml_section, reason);
+        raiseComponentError(component_name, section, reason);
       }
     }
   }
@@ -877,22 +842,18 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
       utils::Regex prop_regex(prop_regex_str);
       if (!utils::regexMatch(prop_pair.second.getValue().to_string(), prop_regex)) {
         std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), "' does not match validation pattern '", prop_regex_str, "'");
-        raiseComponentError(component_name, yaml_section, reason);
+        raiseComponentError(component_name, section, reason);
       }
     }
   }
-#else
-  logging::LOG_INFO(logger_) << "Validation of mutally-exclusive properties is disabled in this build.";
-  logging::LOG_INFO(logger_) << "Regex validation of properties is not available in this build.";
-#endif  // YAML_CONFIGURATION_USE_REGEX
 }
 
-void YamlConfiguration::raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const {
+void StructuredConfiguration::raiseComponentError(const std::string &component_name, const std::string &section, const std::string &reason) const {
   std::string err_msg = "Unable to parse configuration file for component named '";
   err_msg.append(component_name);
   err_msg.append("' because " + reason);
-  if (!yaml_section.empty()) {
-    err_msg.append(" [in '" + yaml_section + "' section of configuration file]");
+  if (!section.empty()) {
+    err_msg.append(" [in '" + section + "' section of configuration file]");
   }
 
   logging::LOG_ERROR(logger_) << err_msg;
@@ -900,60 +861,57 @@ void YamlConfiguration::raiseComponentError(const std::string &component_name, c
   throw std::invalid_argument(err_msg);
 }
 
-std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
-  std::string id;
-  auto node = yamlNode.as<YAML::Node>();
-
-  if (node[idField]) {
-    if (YAML::NodeType::Scalar == node[idField].Type()) {
-      id = node[idField].as<std::string>();
+std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) {
+  if (node[id_field]) {
+    if (auto opt_id_str = node[id_field].getString()) {
+      auto id = opt_id_str.value();
       addNewId(id);
       return id;
     }
-    throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar.");
+    throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string.");
   }
 
-  id = id_generator_->generate().to_string();
+  auto id = id_generator_->generate().to_string();
   logger_->log_debug("Generating random ID: id => [%s]", id);
   return id;
 }
 
-std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) {
-  yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message);
-  auto id = yaml_node["id"].as<std::string>();
+std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) {
+  checkRequiredField(node, "id", section, error_message);
+  auto id = node["id"].getString().value();
   addNewId(id);
   return id;
 }
 
-YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
-                                               const std::string& providedInfoMessage) {
-  std::string infoMessage = providedInfoMessage;
-  auto result = yamlNode.as<YAML::Node>()[fieldName];
+std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section,
+                                               const std::string& info_message) {
+  std::string infoMessage = info_message;
+  auto result = node[field_name];
   if (!result) {
     if (infoMessage.empty()) {
       // Build a helpful info message for the user to inform them that a default is being used
       infoMessage =
-          yamlNode.as<YAML::Node>()["name"] ?
-              "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
-              "Using default value for optional field '" + fieldName + "' ";
-      if (!yamlSection.empty()) {
-        infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
+          node["name"] ?
+              "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" :
+              "Using default value for optional field '" + field_name + "' ";
+      if (!section.empty()) {
+        infoMessage += " [in '" + section + "' section of configuration file]: ";
       }
 
-      infoMessage += defaultValue.as<std::string>();
+      infoMessage += default_value;
     }
     logging::LOG_INFO(logger_) << infoMessage;
-    result = defaultValue;
+    return default_value;
   }
 
-  return result;
+  return result.getString().value();
 }
 
-void YamlConfiguration::addNewId(const std::string& uuid) {
+void StructuredConfiguration::addNewId(const std::string& uuid) {
   const auto [_, success] = uuids_.insert(uuid);
   if (!success) {
     throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration");
   }
 }
 
-}  // namespace org::apache::nifi::minifi::core
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
similarity index 73%
rename from libminifi/src/core/yaml/YamlConnectionParser.cpp
rename to libminifi/src/core/flow/StructuredConnectionParser.cpp
index 9435be361..f39a3ac8b 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -16,22 +16,22 @@
  * limitations under the License.
  */
 
-#include "core/yaml/YamlConnectionParser.h"
-#include "core/yaml/CheckRequiredField.h"
+#include "core/flow/StructuredConnectionParser.h"
+#include "core/flow/CheckRequiredField.h"
 #include "Funnel.h"
 
-namespace org::apache::nifi::minifi::core::yaml {
+namespace org::apache::nifi::minifi::core::flow {
 
-void YamlConnectionParser::addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const {
+void StructuredConnectionParser::addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const {
   core::Relationship relationship(relationship_name, "");
   logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
   connection.addRelationship(relationship);
 }
 
-void YamlConnectionParser::addFunnelRelationshipToConnection(minifi::Connection& connection) const {
+void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Connection& connection) const {
   utils::Identifier srcUUID;
   try {
-    srcUUID = getSourceUUIDFromYaml();
+    srcUUID = getSourceUUID();
   } catch(const std::exception&) {
     return;
   }
@@ -47,18 +47,18 @@ void YamlConnectionParser::addFunnelRelationshipToConnection(minifi::Connection&
   }
 }
 
-void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(minifi::Connection& connection) const {
+void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const {
   // Configure connection source
-  if (connectionNode_.as<YAML::Node>()["source relationship name"] && !connectionNode_["source relationship name"].as<std::string>().empty()) {
-    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>(), connection);
-  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+  if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection);
+  } else if (connectionNode_["source relationship names"]) {
     auto relList = connectionNode_["source relationship names"];
-    if (relList.IsSequence() && relList.begin() != relList.end()) {
+    if (relList.isSequence() && !relList.empty()) {
       for (const auto &rel : relList) {
-        addNewRelationshipToConnection(rel.as<std::string>(), connection);
+        addNewRelationshipToConnection(rel.getString().value(), connection);
       }
-    } else if (!relList.IsSequence() && !relList.as<std::string>().empty()) {
-      addNewRelationshipToConnection(relList.as<std::string>(), connection);
+    } else if (!relList.isSequence() && !relList.getString().value().empty()) {
+      addNewRelationshipToConnection(relList.getString().value(), connection);
     } else {
       addFunnelRelationshipToConnection(connection);
     }
@@ -67,10 +67,9 @@ void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(minifi
   }
 }
 
-uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
-  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
-  if (max_work_queue_data_size_node) {
-    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+uint64_t StructuredConnectionParser::getWorkQueueSize() const {
+  if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) {
+    std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_size;
     if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
       logger_->log_debug("Setting %" PRIu64 " as the max queue size.", max_work_queue_size);
@@ -81,10 +80,10 @@ uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
   return 0;
 }
 
-uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
-  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
+  const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
   if (max_work_queue_data_size_node) {
-    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_data_size = 0;
     if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
       logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data size.", max_work_queue_data_size);
@@ -95,10 +94,10 @@ uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
   return 0;
 }
 
-uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const {
-  const YAML::Node swap_threshold_node = connectionNode_["swap threshold"];
+uint64_t StructuredConnectionParser::getSwapThreshold() const {
+  const flow::Node swap_threshold_node = connectionNode_["swap threshold"];
   if (swap_threshold_node) {
-    auto swap_threshold_str = swap_threshold_node.as<std::string>();
+    auto swap_threshold_str = swap_threshold_node.getString().value();
     uint64_t swap_threshold;
     if (core::Property::StringToInt(swap_threshold_str, swap_threshold)) {
       logger_->log_debug("Setting %" PRIu64 " as the swap threshold.", swap_threshold);
@@ -109,20 +108,20 @@ uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const {
   return 0;
 }
 
-utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
-  const YAML::Node source_id_node = connectionNode_["source id"];
+utils::Identifier StructuredConnectionParser::getSourceUUID() const {
+  const flow::Node source_id_node = connectionNode_["source id"];
   if (source_id_node) {
-    const auto srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value());
     if (srcUUID) {
       logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
       return srcUUID.value();
     }
-    logger_->log_error("Invalid source id value: %s.", source_id_node.as<std::string>());
+    logger_->log_error("Invalid source id value: %s.", source_id_node.getString().value());
     throw std::invalid_argument("Invalid source id");
   }
   // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
-  checkRequiredField(connectionNode_, "source name", CONFIG_YAML_CONNECTIONS_KEY);
-  const auto connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+  checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY);
+  const auto connectionSrcProcName = connectionNode_["source name"].getString().value();
   const auto srcUUID = utils::Identifier::parse(connectionSrcProcName);
   if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the source name is a remote port id, so use that as the source id
@@ -141,21 +140,21 @@ utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
   throw std::invalid_argument(error_msg);
 }
 
-utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
-  const YAML::Node destination_id_node = connectionNode_["destination id"];
+utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
+  const flow::Node destination_id_node = connectionNode_["destination id"];
   if (destination_id_node) {
-    const auto destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+    const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value());
     if (destUUID) {
       logger_->log_debug("Using 'destination id' to match destination with same id for connection '%s': destination id => [%s]", name_, destUUID.value().to_string());
       return destUUID.value();
     }
-    logger_->log_error("Invalid destination id value: %s.", destination_id_node.as<std::string>());
+    logger_->log_error("Invalid destination id value: %s.", destination_id_node.getString().value());
     throw std::invalid_argument("Invalid destination id");
   }
   // we use the same logic as above for resolving the source processor
   // for looking up the destination processor in absence of a processor id
-  checkRequiredField(connectionNode_, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
-  auto connectionDestProcName = connectionNode_["destination name"].as<std::string>();
+  checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY);
+  auto connectionDestProcName = connectionNode_["destination name"].getString().value();
   const auto destUUID = utils::Identifier::parse(connectionDestProcName);
   if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the destination name is a remote port id, so use that as the dest id
@@ -174,14 +173,14 @@ utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
   throw std::invalid_argument(error_msg);
 }
 
-std::chrono::milliseconds YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const {
   using namespace std::literals::chrono_literals;
-  const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+  const flow::Node expiration_node = connectionNode_["flowfile expiration"];
   if (!expiration_node) {
     logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
     return 0ms;
   }
-  auto expiration_duration = utils::timeutils::StringToDuration<std::chrono::milliseconds>(expiration_node.as<std::string>());
+  auto expiration_duration = utils::timeutils::StringToDuration<std::chrono::milliseconds>(expiration_node.getString().value());
   if (!expiration_duration.has_value()) {
     // We should throw here, but we do not.
     // The reason is that our parser only accepts time formats that consists of a number and
@@ -196,12 +195,12 @@ std::chrono::milliseconds YamlConnectionParser::getFlowFileExpirationFromYaml()
   return *expiration_duration;
 }
 
-bool YamlConnectionParser::getDropEmptyFromYaml() const {
-  const YAML::Node drop_empty_node = connectionNode_["drop empty"];
+bool StructuredConnectionParser::getDropEmpty() const {
+  const flow::Node drop_empty_node = connectionNode_["drop empty"];
   if (drop_empty_node) {
-    return utils::StringUtils::toBool(drop_empty_node.as<std::string>()).value_or(false);
+    return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false);
   }
   return false;
 }
 
-}  // namespace org::apache::nifi::minifi::core::yaml
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp
new file mode 100644
index 000000000..960f9a631
--- /dev/null
+++ b/libminifi/src/core/json/JsonConfiguration.cpp
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <vector>
+#include <set>
+#include <cinttypes>
+#include <variant>
+
+#include "core/json/JsonConfiguration.h"
+#include "core/json/JsonNode.h"
+#include "core/state/Value.h"
+#include "Defaults.h"
+#include "utils/TimeUtil.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+
+namespace org::apache::nifi::minifi::core {
+
+namespace {
+
+}  // namespace
+
+
+JsonConfiguration::JsonConfiguration(ConfigurationContext ctx)
+    : StructuredConfiguration(([&] {
+                                if (!ctx.path) {
+                                  ctx.path = DEFAULT_NIFI_CONFIG_JSON;
+                                }
+                                return std::move(ctx);
+                              })(),
+                              logging::LoggerFactory<JsonConfiguration>::getLogger()) {}
+
+std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() {
+  if (!config_path_) {
+    logger_->log_error("Cannot instantiate flow, no config file is set.");
+    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
+  }
+  const auto configuration = filesystem_->read(config_path_.value());
+  if (!configuration) {
+    // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+    return nullptr;
+  }
+  try {
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length());
+    if (!res) {
+      throw std::runtime_error("Could not parse json file");
+    }
+    flow::Node root{std::make_shared<JsonNode>(&doc)};
+    return getRootFrom(root);
+  } catch(...) {
+    logger_->log_error("Invalid json configuration file");
+    throw;
+  }
+}
+
+std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) {
+  try {
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length());
+    if (!res) {
+      throw std::runtime_error("Could not parse json file");
+    }
+    flow::Node root{std::make_shared<JsonNode>(&doc)};
+    return getRootFrom(root);
+  } catch (const std::runtime_error& err) {
+    logger_->log_error(err.what());
+    throw;
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/yaml/CheckRequiredField.cpp b/libminifi/src/core/yaml/CheckRequiredField.cpp
deleted file mode 100644
index bb5b7abf8..000000000
--- a/libminifi/src/core/yaml/CheckRequiredField.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdexcept>
-
-#include "core/yaml/CheckRequiredField.h"
-#include "utils/StringUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace yaml {
-
-bool isFieldPresent(const YAML::Node &yaml_node, std::string_view field_name) {
-  return bool{yaml_node.as<YAML::Node>()[field_name.data()]};
-}
-
-std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_field_names, std::string_view yaml_section) {
-  const YAML::Node name_node = yaml_node.as<YAML::Node>()["name"];
-  // Build a helpful error message for the user so they can fix the
-  // invalid YAML config file, using the component name if present
-  auto field_list_string = utils::StringUtils::join(", ", alternate_field_names);
-  std::string err_msg =
-      name_node ?
-          "Unable to parse configuration file for component named '" + name_node.as<std::string>() + "' as none of the possible required fields [" + field_list_string + "] is available" :
-          "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
-  if (!yaml_section.empty()) {
-    err_msg += " [in '" + std::string(yaml_section) + "' section of configuration file]";
-  }
-  const YAML::Mark mark = yaml_node.Mark();
-  if (!mark.is_null()) {
-    err_msg += " [line:column, pos at " + std::to_string(mark.line) + ":" + std::to_string(mark.column) + ", " + std::to_string(mark.pos) + "]";
-  }
-  return err_msg;
-}
-
-void checkRequiredField(const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section, std::string_view error_message) {
-  if (!isFieldPresent(yaml_node, field_name)) {
-    if (error_message.empty()) {
-      throw std::invalid_argument(buildErrorMessage(yaml_node, std::vector<std::string>{std::string(field_name)}, yaml_section));
-    }
-    throw std::invalid_argument(error_message.data());
-  }
-}
-
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message) {
-  for (const auto& name : alternate_names) {
-    if (yaml::isFieldPresent(yaml_node, name)) {
-      return yaml_node[name].as<std::string>();
-    }
-  }
-  if (error_message.empty()) {
-    throw std::invalid_argument(buildErrorMessage(yaml_node, alternate_names, yaml_section));
-  }
-  throw std::invalid_argument(error_message.data());
-}
-
-}  // namespace yaml
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index f69f73de4..89092eee5 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -22,937 +22,63 @@
 #include <cinttypes>
 
 #include "core/yaml/YamlConfiguration.h"
-#include "core/yaml/CheckRequiredField.h"
-#include "core/yaml/YamlConnectionParser.h"
 #include "core/state/Value.h"
 #include "Defaults.h"
 #include "utils/TimeUtil.h"
-#include "Funnel.h"
-
-#ifdef YAML_CONFIGURATION_USE_REGEX
+#include "yaml-cpp/yaml.h"
+#include "core/yaml/YamlNode.h"
 #include "utils/RegexUtils.h"
-#endif  // YAML_CONFIGURATION_USE_REGEX
 
 namespace org::apache::nifi::minifi::core {
 
-std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
-                                     const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
-                                     const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path,
-                                     const std::shared_ptr<utils::file::FileSystem>& filesystem)
-    : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration,
-                        path.value_or(DEFAULT_NIFI_CONFIG_YML), filesystem),
-      stream_factory_(stream_factory),
-      logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
-  auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
-  auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
-  this->name_ = rootGroup->getName();
-  return rootGroup;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
-  int version = 0;
-
-  yaml::checkRequiredField(yamlNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-  auto flowName = yamlNode["name"].as<std::string>();
-
-  utils::Identifier uuid;
-  // assignment throws on invalid uuid
-  uuid = getOrGenerateId(yamlNode);
-
-  if (yamlNode["version"]) {
-    version = yamlNode["version"].as<int>();
-  }
-
-  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
-  std::unique_ptr<core::ProcessGroup> group;
-  if (is_root) {
-    group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
-  } else {
-    group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
-  }
-
-  if (yamlNode["onschedule retry interval"]) {
-    auto onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
-    logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
-
-    auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
-    if (on_schedule_retry_period_value.has_value() && group) {
-      logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count());
-      group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count());
-    }
-  }
-
-  return group;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
-  auto group = createProcessGroup(headerNode, is_root);
-  YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
-  YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
-  YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
-  YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
-  YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
-  YAML::Node remoteProcessingGroupsNode = [&] {
-    // assignment is not supported on invalid Yaml nodes
-    YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-    if (candidate) {
-      return candidate;
-    }
-    return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
-  }();
-  YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
-
-  parseProcessorNodeYaml(processorsNode, group.get());
-  parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
-  parseFunnelsYaml(funnelsNode, group.get());
-  parsePorts(inputPortsNode, group.get(), PortType::INPUT);
-  parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
-
-  if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
-    for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
-      auto childProcessGroupNode = it->as<YAML::Node>();
-      group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
-    }
-  }
-
-  // parse connections last to give feedback if the source and/or destination processors
-  // is not in the same process group or input/output port connections are not allowed
-  parseConnectionYaml(connectionsNode, group.get());
-  return group;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
-  uuids_.clear();
-  YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
-  YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
-
-  parseControllerServices(controllerServiceNode);
-  // Create the root process group
-  std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
-  parseProvenanceReportingYaml(provenanceReportNode, root.get());
-
-  // set the controller services into the root group.
-  for (const auto& controller_service : controller_services_->getAllControllerServices()) {
-    root->addControllerService(controller_service->getName(), controller_service);
-    root->addControllerService(controller_service->getUUIDStr(), controller_service);
-  }
-
-  root->verify();
-
-  return root;
-}
-
-void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
-  int64_t runDurationNanos = -1;
-  utils::Identifier uuid;
-  std::unique_ptr<core::Processor> processor;
-
-  if (!parentGroup) {
-    logger_->log_error("parseProcessNodeYaml: no parent group exists");
-    return;
-  }
-
-  if (!processorsNode) {
-    throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
-  }
-  if (!processorsNode.IsSequence()) {
-    throw std::invalid_argument(
-        "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
-  }
-  // Evaluate sequence of processors
-  for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
-    core::ProcessorConfig procCfg;
-    const auto procNode = iter->as<YAML::Node>();
-
-    yaml::checkRequiredField(procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.name = procNode["name"].as<std::string>();
-    procCfg.id = getOrGenerateId(procNode);
-
-    uuid = procCfg.id;
-    logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
-    yaml::checkRequiredField(procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.javaClass = procNode["class"].as<std::string>();
-    logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
-
-    // Determine the processor name only from the Java class
-    auto lastOfIdx = procCfg.javaClass.find_last_of('.');
-    if (lastOfIdx != std::string::npos) {
-      lastOfIdx++;  // if a value is found, increment to move beyond the .
-      std::string processorName = procCfg.javaClass.substr(lastOfIdx);
-      processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
-    } else {
-      // Allow unqualified class names for core processors
-      processor = this->createProcessor(procCfg.javaClass, uuid);
-    }
-
-    if (!processor) {
-      logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
-      throw std::invalid_argument("Could not create processor " + procCfg.name);
-    }
-
-    processor->setName(procCfg.name);
-
-    processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
-    auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
-    CONFIG_YAML_PROCESSORS_KEY);
-    procCfg.schedulingStrategy = strategyNode.as<std::string>();
-    logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
-
-    auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
-    CONFIG_YAML_PROCESSORS_KEY);
-
-    procCfg.schedulingPeriod = periodNode.as<std::string>();
-    logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
-
-    if (procNode["max concurrent tasks"]) {
-      procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
-      logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
-    }
-
-    if (procNode["penalization period"]) {
-      procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
-      logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
-    }
-
-    if (procNode["yield period"]) {
-      procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
-      logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
-    }
-
-    if (procNode["run duration nanos"]) {
-      procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
-      logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
-    }
-
-    // handle auto-terminated relationships
-    if (procNode["auto-terminated relationships list"]) {
-      YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
-      std::vector<std::string> rawAutoTerminatedRelationshipValues;
-      if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
-        for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
-          auto autoTerminatedRel = relIter->as<std::string>();
-          rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
-        }
-      }
-      procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
-    }
-
-    // handle processor properties
-    if (procNode["Properties"]) {
-      YAML::Node propertiesNode = procNode["Properties"];
-      parsePropertiesNodeYaml(propertiesNode, *processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
-    }
-
-    // Take care of scheduling
-
-    if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-      if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
-        logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count());
-        processor->setSchedulingPeriodNano(*scheduling_period);
-      }
-    } else {
-      processor->setCronPeriod(procCfg.schedulingPeriod);
-    }
-
-    if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
-      logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count());
-      processor->setPenalizationPeriod(penalization_period.value());
-    }
-
-    if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
-      logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count());
-      processor->setYieldPeriodMsec(yield_period.value());
-    }
-
-    // Default to running
-    processor->setScheduledState(core::RUNNING);
-
-    if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
-      processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-    } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-      processor->setSchedulingStrategy(core::EVENT_DRIVEN);
-      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-    } else {
-      processor->setSchedulingStrategy(core::CRON_DRIVEN);
-      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-    }
-
-    int32_t maxConcurrentTasks;
-    if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
-      logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-      processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
-    }
-
-    if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
-      logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
-      processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos));
-    }
-
-    std::vector<core::Relationship> autoTerminatedRelationships;
-    for (auto &&relString : procCfg.autoTerminatedRelationships) {
-      core::Relationship relationship(relString, "");
-      logger_->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString);
-      autoTerminatedRelationships.push_back(relationship);
-    }
-
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    parentGroup->addProcessor(std::move(processor));
-  }
-}
-
-void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
-  utils::Identifier uuid;
-  std::string id;
-
-  if (!parentGroup) {
-    logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
-    return;
-  }
-
-  if (!rpgNode || !rpgNode.IsSequence()) {
-    return;
-  }
-  for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
-    auto currRpgNode = iter->as<YAML::Node>();
-
-    yaml::checkRequiredField(currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-    auto name = currRpgNode["name"].as<std::string>();
-    id = getOrGenerateId(currRpgNode);
-
-    logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
-
-    auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
-    CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-
-    auto url = urlNode.as<std::string>();
-    logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
-
-    uuid = id;
-    auto group = this->createRemoteProcessGroup(name, uuid);
-    group->setParent(parentGroup);
-
-    if (currRpgNode["yield period"]) {
-      auto yieldPeriod = currRpgNode["yield period"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
-
-      auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
-      if (yield_period_value.has_value() && group) {
-        logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
-        group->setYieldPeriodMsec(*yield_period_value);
-      }
-    }
-
-    if (currRpgNode["timeout"]) {
-      auto timeout = currRpgNode["timeout"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
-
-      auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
-      if (timeout_value.has_value() && group) {
-        logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
-        group->setTimeout(timeout_value->count());
-      }
-    }
-
-    if (currRpgNode["local network interface"]) {
-      auto interface = currRpgNode["local network interface"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
-      group->setInterface(interface);
-    }
-
-    if (currRpgNode["transport protocol"]) {
-      auto transport_protocol = currRpgNode["transport protocol"].as<std::string>();
-      logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
-      if (transport_protocol == "HTTP") {
-        group->setTransportProtocol(transport_protocol);
-        if (currRpgNode["proxy host"]) {
-          auto http_proxy_host = currRpgNode["proxy host"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
-          group->setHttpProxyHost(http_proxy_host);
-          if (currRpgNode["proxy user"]) {
-            auto http_proxy_username = currRpgNode["proxy user"].as<std::string>();
-            logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
-            group->setHttpProxyUserName(http_proxy_username);
-          }
-          if (currRpgNode["proxy password"]) {
-            auto http_proxy_password = currRpgNode["proxy password"].as<std::string>();
-            logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
-            group->setHttpProxyPassWord(http_proxy_password);
+YamlConfiguration::YamlConfiguration(ConfigurationContext ctx)
+    : StructuredConfiguration(([&] {
+          if (!ctx.path) {
+            ctx.path = DEFAULT_NIFI_CONFIG_YML;
           }
-          if (currRpgNode["proxy port"]) {
-            auto http_proxy_port = currRpgNode["proxy port"].as<std::string>();
-            int32_t port;
-            if (core::Property::StringToInt(http_proxy_port, port)) {
-              logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
-              group->setHttpProxyPort(port);
-            }
-          }
-        }
-      } else if (transport_protocol == "RAW") {
-        group->setTransportProtocol(transport_protocol);
-      } else {
-        std::stringstream stream;
-        stream << "Invalid transport protocol " << transport_protocol;
-        throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
-      }
-    }
-
-    group->setTransmitting(true);
-    group->setURL(url);
-
-    yaml::checkRequiredField(currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-    auto inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
-    if (inputPorts && inputPorts.IsSequence()) {
-      for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-        auto currPort = portIter->as<YAML::Node>();
-
-        this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
-      }  // for node
-    }
-    auto outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
-    if (outputPorts && outputPorts.IsSequence()) {
-      for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
-        logger_->log_debug("Got a current port, iterating...");
-
-        auto currPort = portIter->as<YAML::Node>();
-
-        this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
-      }  // for node
-    }
-    parentGroup->addProcessGroup(std::move(group));
-  }
-}
-
-void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
-  utils::Identifier port_uuid;
-
-  if (!parentGroup) {
-    logger_->log_error("parseProvenanceReportingYaml: no parent group exists");
-    return;
-  }
-
-  if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
-    logger_->log_debug("no provenance reporting task specified");
-    return;
-  }
-
-  auto reportTask = createProvenanceReportTask();
-
-  const auto node = reportNode.as<YAML::Node>();
-
-  yaml::checkRequiredField(node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
-  yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
-
-  if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
-    logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
-    reportTask->setSchedulingPeriodNano(*scheduling_period);
-  }
-
-  if (schedulingStrategyStr == "TIMER_DRIVEN") {
-    reportTask->setSchedulingStrategy(core::TIMER_DRIVEN);
-    logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
-  } else {
-    throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr);
-  }
-
-  int64_t lvalue;
-  if (node["host"] && node["port"]) {
-    auto hostStr = node["host"].as<std::string>();
-
-    auto portStr = node["port"].as<std::string>();
-    if (core::Property::StringToInt(portStr, lvalue) && !hostStr.empty()) {
-      logger_->log_debug("ProvenanceReportingTask port %" PRId64, lvalue);
-      std::string url = hostStr + ":" + portStr;
-      reportTask->setURL(url);
-    }
-  }
-
-  if (node["url"]) {
-    auto urlStr = node["url"].as<std::string>();
-    if (!urlStr.empty()) {
-      reportTask->setURL(urlStr);
-      logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
-    }
-  }
-  yaml::checkRequiredField(node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto portUUIDStr = node["port uuid"].as<std::string>();
-  yaml::checkRequiredField(node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
-  auto batchSizeStr = node["batch size"].as<std::string>();
-
-  logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
-  port_uuid = portUUIDStr;
-  reportTask->setPortUUID(port_uuid);
-
-  if (core::Property::StringToInt(batchSizeStr, lvalue)) {
-    reportTask->setBatchSize(gsl::narrow<int>(lvalue));
-  }
-
-  reportTask->initialize();
-
-  // add processor to parent
-  reportTask->setScheduledState(core::RUNNING);
-  parentGroup->addProcessor(std::move(reportTask));
-}
-
-void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
-  if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
-    return;
-  }
-  for (const auto& iter : controllerServicesNode) {
-    const auto controllerServiceNode = iter.as<YAML::Node>();
-    try {
-      yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-
-      auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-      logger_->log_debug("Using type %s for controller service node", type);
-
-      std::string fullType = type;
-      auto lastOfIdx = type.find_last_of('.');
-      if (lastOfIdx != std::string::npos) {
-        lastOfIdx++;  // if a value is found, increment to move beyond the .
-        type = type.substr(lastOfIdx);
-      }
-
-      auto name = controllerServiceNode["name"].as<std::string>();
-      auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-
-      utils::Identifier uuid;
-      uuid = id;
-      std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
-      if (nullptr != controller_service_node) {
-        logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
-        controller_service_node->initialize();
-        YAML::Node propertiesNode = controllerServiceNode["Properties"];
-        // we should propagate properties to the node and to the implementation
-        parsePropertiesNodeYaml(propertiesNode, *controller_service_node, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-        if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
-          parsePropertiesNodeYaml(propertiesNode, *controllerServiceImpl, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-        }
-      } else {
-        logger_->log_debug("Could not locate %s", type);
-      }
-      controller_services_->put(id, controller_service_node);
-      controller_services_->put(name, controller_service_node);
-    } catch (YAML::InvalidNode &) {
-      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
-    }
-  }
-}
-
-void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
-  if (!parent) {
-    logger_->log_error("parseProcessNode: no parent group was provided");
-    return;
-  }
-  if (!connectionsNode || !connectionsNode.IsSequence()) {
-    return;
-  }
-
-  for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
-    const auto connectionNode = iter->as<YAML::Node>();
-
-    // Configure basic connection
-    const std::string id = getOrGenerateId(connectionNode);
-
-    // Default name to be same as ID
-    // If name is specified in configuration, use the value
-    const auto name = connectionNode["name"].as<std::string>(id);
-
-    const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
-      logger_->log_debug("Incorrect connection UUID format.");
-      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect connection UUID format.");
-    });
-
-    auto connection = createConnection(name, uuid.value());
-    logger_->log_debug("Created connection with UUID %s and name %s", id, name);
-    const yaml::YamlConnectionParser connectionParser(connectionNode, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
-    connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection);
-    connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
-    connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
-    connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml());
-    connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
-    connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
-    connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
-    connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
-
-    parent->addConnection(std::move(connection));
-  }
-}
+          return std::move(ctx);
+        })(),
+        logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
 
-void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
-  utils::Identifier uuid;
-
-  if (!parent) {
-    logger_->log_error("parseProcessNode: no parent group existed");
-    return;
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() {
+  if (!config_path_) {
+    logger_->log_error("Cannot instantiate flow, no config file is set.");
+    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
   }
-
-  const auto inputPortsObj = portNode.as<YAML::Node>();
-
-  // Check for required fields
-  yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-  auto nameStr = inputPortsObj["name"].as<std::string>();
-  auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
-    "The field 'id' is required for "
-    "the port named '" + nameStr + "' in the YAML Config. If this port "
-    "is an input port for a NiFi Remote Process Group, the port "
-    "id should match the corresponding id specified in the NiFi configuration. "
-    "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
-  uuid = portId;
-
-  auto port = std::make_unique<minifi::RemoteProcessorGroupPort>(
-          stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid);
-  port->setDirection(direction);
-  port->setTimeout(parent->getTimeout());
-  port->setTransmitting(true);
-  port->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-  port->initialize();
-  if (!parent->getInterface().empty())
-    port->setInterface(parent->getInterface());
-  if (parent->getTransportProtocol() == "HTTP") {
-    port->enableHTTP();
-    if (!parent->getHttpProxyHost().empty())
-      port->setHTTPProxy(parent->getHTTPProxy());
+  const auto configuration = filesystem_->read(config_path_.value());
+  if (!configuration) {
+  // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+  return nullptr;
   }
-  // else defaults to RAW
-
-  // handle port properties
-  const auto nodeVal = portNode.as<YAML::Node>();
-  YAML::Node propertiesNode = nodeVal["Properties"];
-  parsePropertiesNodeYaml(propertiesNode, *port, nameStr, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-
-  // add processor to parent
-  auto& processor = *port;
-  parent->addProcessor(std::move(port));
-  processor.setScheduledState(core::RUNNING);
-
-  if (inputPortsObj["max concurrent tasks"]) {
-    auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
-    int32_t maxConcurrentTasks;
-    if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-      processor.setMaxConcurrentTasks(maxConcurrentTasks);
-    }
-    logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-    processor.setMaxConcurrentTasks(maxConcurrentTasks);
-  }
-}
-
-void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component) {
-  for (const auto& iter : propertyValueNode) {
-    if (iter.IsDefined()) {
-      const auto nodeVal = iter.as<YAML::Node>();
-      YAML::Node propertiesNode = nodeVal["value"];
-      // must insert the sequence in differently.
-      const auto rawValueString = propertiesNode.as<std::string>();
-      logger_->log_debug("Found %s=%s", propertyName, rawValueString);
-      if (!component.updateProperty(propertyName, rawValueString)) {
-        auto proc = dynamic_cast<core::Connectable*>(&component);
-        if (proc) {
-          logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
-          if (!component.setDynamicProperty(propertyName, rawValueString)) {
-            logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
-          } else {
-            logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
-          }
-        }
-      }
-    }
-  }
-}
-
-PropertyValue YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode) {
-  PropertyValue defaultValue;
-  defaultValue = propertyFromProcessor.getDefaultValue();
-  const std::type_index defaultType = defaultValue.getTypeInfo();
   try {
-    PropertyValue coercedValue = defaultValue;
-    if (defaultType == typeid(int64_t)) {
-      coercedValue = propertyValueNode.as<int64_t>();
-    } else if (defaultType == typeid(uint64_t)) {
-      uint64_t integer_value;
-      if (YAML::convert<uint64_t>::decode(propertyValueNode, integer_value)) {
-        coercedValue = integer_value;
-      } else {
-        coercedValue = propertyValueNode.as<std::string>();
-      }
-    } else if (defaultType == typeid(int)) {
-      coercedValue = propertyValueNode.as<int>();
-    } else if (defaultType == typeid(bool)) {
-      coercedValue = propertyValueNode.as<bool>();
-    } else {
-      coercedValue = propertyValueNode.as<std::string>();
-    }
-    return coercedValue;
-  } catch (const std::exception& e) {
-    logger_->log_error("Fetching property failed with an exception of %s", e.what());
-    logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
-  } catch (...) {
-    logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+    YAML::Node rootYamlNode = YAML::Load(configuration.value());
+    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+    return getRootFrom(root);
+  } catch(...) {
+    logger_->log_error("Invalid yaml configuration file");
+    throw;
   }
-  return defaultValue;
 }
 
-void YamlConfiguration::parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
-  core::Property myProp(propertyName, "", "");
-  processor.getProperty(propertyName, myProp);
-  const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode);
-  bool property_set = false;
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) {
   try {
-    property_set = processor.setProperty(myProp, coercedValue);
-  } catch(const utils::internal::InvalidValueException&) {
-    auto component = dynamic_cast<core::CoreComponent*>(&processor);
-    if (component == nullptr) {
-      logger_->log_error("processor was not a CoreComponent for property '%s'", propertyName);
-    } else {
-      logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName());
-    }
+    YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
+    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+    return getRootFrom(root);
+  } catch (const YAML::ParserException &pe) {
+    logger_->log_error(pe.what());
     throw;
   }
-  const auto rawValueString = propertyValueNode.as<std::string>();
-  if (!property_set) {
-    auto proc = dynamic_cast<core::Connectable*>(&processor);
-    if (proc) {
-      logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
-      if (!processor.setDynamicProperty(propertyName, rawValueString)) {
-        logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
-      } else {
-        logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
-      }
-    }
-  } else {
-    logger_->log_debug("Property %s with value %s set", propertyName, rawValueString);
-  }
-}
-
-void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
-  logger_->log_trace("Encountered %s", propertyName);
-  if (propertyValueNode.IsNull() || !propertyValueNode.IsDefined()) {
-    return;
-  }
-  if (propertyValueNode.IsSequence()) {
-    parsePropertyValueSequence(propertyName, propertyValueNode, processor);
-  } else {
-    parseSingleProperty(propertyName, propertyValueNode, processor);
-  }
-}
-
-void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name,
-    const std::string& yaml_section) {
-  // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
-  logger_->log_trace("Entered %s", component_name);
-  for (const auto& propertyElem : propertiesNode) {
-    const auto propertyName = propertyElem.first.as<std::string>();
-    const YAML::Node propertyValueNode = propertyElem.second;
-    parsePropertyNodeElement(propertyName, propertyValueNode, component);
-  }
-
-  validateComponentProperties(component, component_name, yaml_section);
-}
-
-void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent) {
-  if (!parent) {
-    logger_->log_error("parseFunnelsYaml: no parent group was provided");
-    return;
-  }
-  if (!node || !node.IsSequence()) {
-    return;
-  }
-
-  for (const auto& element : node) {
-    const auto funnel_node = element.as<YAML::Node>();
-
-    std::string id = getOrGenerateId(funnel_node);
-
-    // Default name to be same as ID
-    const auto name = funnel_node["name"].as<std::string>(id);
-
-    const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
-      logger_->log_debug("Incorrect funnel UUID format.");
-      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
-    });
-
-    auto funnel = std::make_unique<Funnel>(name, uuid.value());
-    logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
-    funnel->setScheduledState(core::RUNNING);
-    funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
-    parent->addProcessor(std::move(funnel));
-  }
-}
-
-void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
-  if (!parent) {
-    logger_->log_error("parsePorts: no parent group was provided");
-    return;
-  }
-  if (!node || !node.IsSequence()) {
-    return;
-  }
-
-  for (const auto& element : node) {
-    const auto port_node = element.as<YAML::Node>();
-
-    std::string id = getOrGenerateId(port_node);
-
-    // Default name to be same as ID
-    const auto name = port_node["name"].as<std::string>(id);
-
-    const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
-      logger_->log_debug("Incorrect port UUID format.");
-      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format.");
-    });
-
-    auto port = std::make_unique<Port>(name, uuid.value(), port_type);
-    logger_->log_debug("Created port UUID %s and name %s", id, name);
-    port->setScheduledState(core::RUNNING);
-    port->setSchedulingStrategy(core::EVENT_DRIVEN);
-    parent->addPort(std::move(port));
-  }
-}
-
-void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
-  const auto &component_properties = component.getProperties();
-
-  // Validate required properties
-  for (const auto &prop_pair : component_properties) {
-    if (prop_pair.second.getRequired()) {
-      if (prop_pair.second.getValue().to_string().empty()) {
-        std::string reason = utils::StringUtils::join_pack("required property '", prop_pair.second.getName(), "' is not set");
-        raiseComponentError(component_name, yaml_section, reason);
-      } else if (!prop_pair.second.getValue().validate(prop_pair.first).valid()) {
-        std::string reason = utils::StringUtils::join_pack("the value '", prop_pair.first, "' is not valid for property '", prop_pair.second.getName(), "'");
-        raiseComponentError(component_name, yaml_section, reason);
-      }
-    }
-  }
-
-  // Validate dependent properties
-  for (const auto &prop_pair : component_properties) {
-    const auto &dep_props = prop_pair.second.getDependentProperties();
-
-    if (prop_pair.second.getValue().to_string().empty()) {
-      continue;
-    }
-
-    for (const auto &dep_prop_key : dep_props) {
-      if (component_properties.at(dep_prop_key).getValue().to_string().empty()) {
-        std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
-            "' depends on property '", dep_prop_key, "' which is not set");
-        raiseComponentError(component_name, yaml_section, reason);
-      }
-    }
-  }
-
-#ifdef YAML_CONFIGURATION_USE_REGEX
-  // Validate mutually-exclusive properties
-  for (const auto &prop_pair : component_properties) {
-    const auto &excl_props = prop_pair.second.getExclusiveOfProperties();
-
-    if (prop_pair.second.getValue().empty()) {
-      continue;
-    }
-
-    for (const auto &excl_pair : excl_props) {
-      utils::Regex excl_expr(excl_pair.second);
-      if (utils::regexMatch(component_properties.at(excl_pair.first).getValue().to_string(), excl_expr)) {
-        std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
-            "' must not be set when the value of property '", excl_pair.first, "' matches '", excl_pair.second, "'");
-        raiseComponentError(component_name, yaml_section, reason);
-      }
-    }
-  }
-
-  // Validate regex properties
-  for (const auto &prop_pair : component_properties) {
-    const auto &prop_regex_str = prop_pair.second.getValidRegex();
-
-    if (!prop_regex_str.empty()) {
-      utils::Regex prop_regex(prop_regex_str);
-      if (!utils::regexMatch(prop_pair.second.getValue().to_string(), prop_regex)) {
-        std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), "' does not match validation pattern '", prop_regex_str, "'");
-        raiseComponentError(component_name, yaml_section, reason);
-      }
-    }
-  }
-#else
-  logging::LOG_INFO(logger_) << "Validation of mutally-exclusive properties is disabled in this build.";
-  logging::LOG_INFO(logger_) << "Regex validation of properties is not available in this build.";
-#endif  // YAML_CONFIGURATION_USE_REGEX
-}
-
-void YamlConfiguration::raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const {
-  std::string err_msg = "Unable to parse configuration file for component named '";
-  err_msg.append(component_name);
-  err_msg.append("' because " + reason);
-  if (!yaml_section.empty()) {
-    err_msg.append(" [in '" + yaml_section + "' section of configuration file]");
-  }
-
-  logging::LOG_ERROR(logger_) << err_msg;
-
-  throw std::invalid_argument(err_msg);
-}
-
-std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
-  std::string id;
-  auto node = yamlNode.as<YAML::Node>();
-
-  if (node[idField]) {
-    if (YAML::NodeType::Scalar == node[idField].Type()) {
-      id = node[idField].as<std::string>();
-      addNewId(id);
-      return id;
-    }
-    throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar.");
-  }
-
-  id = id_generator_->generate().to_string();
-  logger_->log_debug("Generating random ID: id => [%s]", id);
-  return id;
 }
 
-std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) {
-  yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message);
-  auto id = yaml_node["id"].as<std::string>();
-  addNewId(id);
-  return id;
-}
-
-YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
-                                               const std::string& providedInfoMessage) {
-  std::string infoMessage = providedInfoMessage;
-  auto result = yamlNode.as<YAML::Node>()[fieldName];
-  if (!result) {
-    if (infoMessage.empty()) {
-      // Build a helpful info message for the user to inform them that a default is being used
-      infoMessage =
-          yamlNode.as<YAML::Node>()["name"] ?
-              "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
-              "Using default value for optional field '" + fieldName + "' ";
-      if (!yamlSection.empty()) {
-        infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
-      }
-
-      infoMessage += defaultValue.as<std::string>();
-    }
-    logging::LOG_INFO(logger_) << infoMessage;
-    result = defaultValue;
-  }
-
-  return result;
-}
-
-void YamlConfiguration::addNewId(const std::string& uuid) {
-  const auto [_, success] = uuids_.insert(uuid);
-  if (!success) {
-    throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration");
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const std::string &yamlConfigPayload) {
+  try {
+    YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
+    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+    return getRootFrom(root);
+  } catch (const YAML::ParserException &pe) {
+    logger_->log_error(pe.what());
+    throw;
   }
 }
 
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b72a188a9..684d71988 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -63,7 +63,7 @@ class TestControllerWithFlow: public TestController{
     REQUIRE(content_repo->initialize(configuration_));
     std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration_);
 
-    auto flow = std::make_unique<core::YamlConfiguration>(prov_repo, ff_repo, content_repo, stream_factory, configuration_, yaml_path_.string());
+    auto flow = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{prov_repo, ff_repo, content_repo, stream_factory, configuration_, yaml_path_.string()});
     auto root = flow->getRoot();
     root_ = root.get();
     controller_ = std::make_shared<minifi::FlowController>(
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 0e6abe3c4..90b20e627 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -174,7 +174,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_
       filesystem = std::make_shared<utils::file::FileSystem>();
     }
 
-    auto flow_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem);
+    auto flow_config = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem});
 
     auto controller_service_provider = flow_config->getControllerServiceProvider();
     char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 902d81671..5344722ff 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -57,13 +57,13 @@ int main(int argc, char **argv) {
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
-      test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+      core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
 
   const auto controller = std::make_shared<minifi::FlowController>(
       test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
       std::make_shared<utils::file::FileSystem>(), []{});
 
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+  core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
 
   std::shared_ptr<core::ProcessGroup> pg = yaml_config.getRoot();
   std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
index 20831f7c2..e3797479c 100644
--- a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
@@ -95,7 +95,7 @@ class PersistableKeyValueStoreServiceTestsFixture {
     content_repo->initialize(configuration);
     stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
-    yaml_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml);
+    yaml_config = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml});
 
     process_group = yaml_config->getRoot();
     persistable_key_value_store_service_node = process_group->findControllerService("testcontroller");
diff --git a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
index c0853f2d1..5058ebebc 100644
--- a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
@@ -88,7 +88,8 @@ class UnorderedMapKeyValueStoreServiceTestFixture {
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
 
-  std::unique_ptr<core::YamlConfiguration> yaml_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml);
+  std::unique_ptr<core::YamlConfiguration> yaml_config = std::make_unique<core::YamlConfiguration>(
+      core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml});
   std::unique_ptr<core::ProcessGroup> process_group;
 
   std::shared_ptr<core::controller::ControllerServiceNode> key_value_store_service_node;
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index d9bc6dc69..0dcbb4c40 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -184,7 +184,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
   ff_repository->initialize(config);
   content_repo->initialize(config);
 
-  auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+  auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
   auto flowController = std::make_shared<minifi::FlowController>(
       prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
@@ -304,7 +304,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
   ff_repository->initialize(config);
   content_repo->initialize(config);
 
-  auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+  auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
   auto flowController = std::make_shared<minifi::FlowController>(
       prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 94ec4ddfb..520c0629d 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -297,7 +297,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   auto inputPtr = input.get();
   root->addConnection(std::move(input));
 
-  auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+  auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
   auto flowController = std::make_shared<minifi::FlowController>(
       prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
 
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index c15449677..a9ef41bab 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -356,8 +356,14 @@ int main(int argc, char **argv) {
         utils::crypto::EncryptionProvider::create(minifiHome));
 
     std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(
-        prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name,
-        configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem);
+        core::ConfigurationContext{
+          .repo = prov_repo,
+          .flow_file_repo = flow_repo,
+          .content_repo = content_repo,
+          .stream_factory = stream_factory,
+          .configuration = configure,
+          .path = configure->get(minifi::Configure::nifi_flow_configuration_file),
+          .filesystem = filesystem}, nifi_configuration_class_name);
 
     const auto controller = std::make_unique<minifi::FlowController>(
         prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem, request_restart);