You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/04/20 07:03:10 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1535 - Support process groups

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a29b414  MINIFICPP-1535 - Support process groups
a29b414 is described below

commit a29b414b8f7f5310c88a51e9448ec8ce3a83b4c0
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Apr 7 08:35:28 2021 +0200

    MINIFICPP-1535 - Support process groups
    
    - Remove obsolete logs
    - Move error reporting
    - Review changes, restore lost tests
    - Rename test accessor
    - Change log
    - Review change
    - Linter fix
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1045
---
 .../tests/unit/ProcessGroupTestUtils.h             | 290 +++++++++
 .../tests/unit/YamlProcessGroupParserTests.cpp     | 130 ++++
 libminifi/include/core/FlowConfiguration.h         |   6 +-
 libminifi/include/core/ProcessGroup.h              |  38 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |  33 +-
 libminifi/src/core/FlowConfiguration.cpp           |  16 +-
 libminifi/src/core/ProcessGroup.cpp                |  83 ++-
 libminifi/src/core/yaml/YamlConfiguration.cpp      | 704 +++++++++++----------
 libminifi/src/core/yaml/YamlConnectionParser.cpp   |  12 +-
 9 files changed, 884 insertions(+), 428 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
new file mode 100644
index 0000000..e492f87
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <set>
+#include <memory>
+#include <vector>
+
+#include "TestBase.h"
+#include "YamlConfiguration.h"
+#include "Utils.h"
+
+struct Lines {
+  std::vector<std::string> lines;
+
+  std::string join(const std::string& delim) const {
+    return utils::StringUtils::join(delim, lines);
+  }
+
+  Lines& indentAll() & {
+    for (auto& line : lines) {
+      line = "  " + std::move(line);
+    }
+    return *this;
+  }
+
+  Lines&& indentAll() && {
+    for (auto& line : lines) {
+      line = "  " + std::move(line);
+    }
+    return std::move(*this);
+  }
+
+  Lines& append(Lines more_lines) {
+    std::move(more_lines.lines.begin(), more_lines.lines.end(), std::back_inserter(lines));
+    return *this;
+  }
+
+  Lines& emplace_back(std::string line) {
+    lines.emplace_back(std::move(line));
+    return *this;
+  }
+};
+
+struct Proc {
+  std::string id;
+  std::string name;
+
+  Lines serialize() const {
+    return {{
+      "- id: " + id,
+      "  name: " + name,
+      "  class: LogAttribute"
+    }};
+  }
+};
+
+struct UnresolvedProc {
+  explicit UnresolvedProc(std::string id): id(std::move(id)) {}
+  std::string id;
+};
+
+struct MaybeProc {
+  MaybeProc(const Proc& proc): id(proc.id), name(proc.name) {}  // NOLINT
+  MaybeProc(const UnresolvedProc& proc) : id(proc.id) {}  // NOLINT
+
+  std::string id;
+  utils::optional<std::string> name;
+};
+
+struct Conn {
+  std::string name;
+  MaybeProc source;
+  MaybeProc destination;
+
+  Lines serialize() const {
+    return {{
+      "- name: " + name,
+      "  source id: " + source.id,
+      "  destination id: " + destination.id,
+      "  source relationship name: success"
+    }};
+  }
+};
+
+struct RPG {
+  std::string name;
+  std::vector<Proc> input_ports;
+
+  Lines serialize() const {
+    std::vector<std::string> lines;
+    lines.emplace_back("- name: " + name);
+    if (input_ports.empty()) {
+      lines.emplace_back("  Input Ports: []");
+    } else {
+      lines.emplace_back("  Input Ports:");
+      for (const auto& port : input_ports) {
+        lines.emplace_back("    - id: " + port.id);
+        lines.emplace_back("      name: " + port.name);
+      }
+    }
+    return {lines};
+  }
+};
+
+struct Group {
+  explicit Group(std::string name): name_(std::move(name)) {}
+  Group& With(std::vector<Conn> connections) {
+    connections_ = std::move(connections);
+    return *this;
+  }
+  Group& With(std::vector<Proc> processors) {
+    processors_ = std::move(processors);
+    return *this;
+  }
+  Group& With(std::vector<Group> subgroups) {
+    subgroups_ = std::move(subgroups);
+    return *this;
+  }
+  Group& With(std::vector<RPG> rpgs) {
+    rpgs_ = std::move(rpgs);
+    return *this;
+  }
+  Lines serialize(bool is_root = true) const {
+    Lines body;
+    if (processors_.empty()) {
+      body.emplace_back("Processors: []");
+    } else {
+      body.emplace_back("Processors:");
+      for (const auto& proc : processors_) {
+        body.append(proc.serialize().indentAll());
+      }
+    }
+    if (!connections_.empty()) {
+      body.emplace_back("Connections:");
+      for (const auto& conn : connections_) {
+        body.append(conn.serialize().indentAll());
+      }
+    }
+    if (rpgs_.empty()) {
+      body.emplace_back("Remote Process Groups: []");
+    } else {
+      body.emplace_back("Remote Process Groups:");
+      for (const auto& rpg : rpgs_) {
+        body.append(rpg.serialize().indentAll());
+      }
+    }
+    if (!subgroups_.empty()) {
+      body.emplace_back("Process Groups:");
+      for (const auto& subgroup : subgroups_) {
+        body.append(subgroup.serialize(false).indentAll());
+      }
+    }
+    Lines lines;
+    if (is_root) {
+      lines.emplace_back("Flow Controller:");
+      lines.emplace_back("  name: " + name_);
+      lines.append(std::move(body));
+    } else {
+      lines.emplace_back("- name: " + name_);
+      lines.append(std::move(body).indentAll());
+    }
+    return lines;
+  }
+
+  std::string name_;
+  std::vector<Conn> connections_;
+  std::vector<Proc> processors_;
+  std::vector<Group> subgroups_;
+  std::vector<RPG> rpgs_;
+};
+
+struct ProcessGroupTestAccessor {
+  FIELD_ACCESSOR(processors_)
+  FIELD_ACCESSOR(connections_)
+  FIELD_ACCESSOR(child_process_groups_)
+};
+
+template<typename T, typename = void>
+struct Resolve;
+
+template<typename T>
+struct Resolve<T, typename std::enable_if<!std::is_pointer<T>::value>::type> {
+  static auto get(const T& item) -> decltype(item.get()) {
+    return item.get();
+  }
+};
+
+template<typename T>
+struct Resolve<T, typename std::enable_if<std::is_pointer<T>::value>::type> {
+  static auto get(const T& item) -> T {
+    return item;
+  }
+};
+
+template<typename T>
+auto findByName(const std::set<T>& set, const std::string& name) -> decltype(Resolve<T>::get(std::declval<const T&>())) {
+  auto it = std::find_if(set.begin(), set.end(), [&](const T& item) {
+    return item->getName() == name;
+  });
+  if (it != set.end()) {
+    return Resolve<T>::get(*it);
+  }
+  return nullptr;
+}
+
+void verifyProcessGroup(core::ProcessGroup& group, const Group& pattern) {
+  // verify name
+  REQUIRE(group.getName() == pattern.name_);
+  // verify connections
+  std::set<std::shared_ptr<minifi::Connection>>& connections = ProcessGroupTestAccessor::get_connections_(group);
+  REQUIRE(connections.size() == pattern.connections_.size());
+  for (auto& expected : pattern.connections_) {
+    auto conn = findByName(connections, expected.name);
+    REQUIRE(conn);
+    if (!expected.source.name) {
+      REQUIRE(conn->getSource() == nullptr);
+      REQUIRE(utils::verifyLogLinePresenceInPollTime(
+          std::chrono::seconds{1},
+          "Cannot find the source processor with id '" + expected.source.id
+          + "' for the connection [name = '" + expected.name + "'"));
+    } else {
+      REQUIRE(conn->getSource()->getName() == expected.source.name);
+    }
+    if (!expected.destination.name) {
+      REQUIRE(conn->getDestination() == nullptr);
+      REQUIRE(utils::verifyLogLinePresenceInPollTime(
+          std::chrono::seconds{1},
+          "Cannot find the destination processor with id '" + expected.destination.id
+          + "' for the connection [name = '" + expected.name + "'"));
+    } else {
+      REQUIRE(conn->getDestination()->getName() == expected.destination.name);
+    }
+  }
+
+  // verify processors
+  std::set<std::shared_ptr<core::Processor>>& processors = ProcessGroupTestAccessor::get_processors_(group);
+  REQUIRE(processors.size() == pattern.processors_.size());
+  for (auto& expected : pattern.processors_) {
+    REQUIRE(findByName(processors, expected.name));
+  }
+
+  std::set<core::ProcessGroup*> simple_subgroups;
+  std::set<core::ProcessGroup*> rpg_subgroups;
+  for (auto& subgroup : ProcessGroupTestAccessor::get_child_process_groups_(group)) {
+    if (subgroup->isRemoteProcessGroup()) {
+      rpg_subgroups.insert(subgroup.get());
+    } else {
+      simple_subgroups.insert(subgroup.get());
+    }
+  }
+  // verify remote process groups
+  REQUIRE(rpg_subgroups.size() == pattern.rpgs_.size());
+  for (auto& expected : pattern.rpgs_) {
+    auto rpg = findByName(rpg_subgroups, expected.name);
+    REQUIRE(rpg);
+    std::set<std::shared_ptr<core::Processor>>& input_ports = ProcessGroupTestAccessor::get_processors_(*rpg);
+    REQUIRE(input_ports.size() == expected.input_ports.size());
+    for (auto& expected_input_port : expected.input_ports) {
+      auto input_port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(findByName(input_ports, expected_input_port.name));
+      REQUIRE(input_port);
+      REQUIRE(input_port->getName() == expected_input_port.name);
+    }
+  }
+
+  // verify subgroups
+  REQUIRE(simple_subgroups.size() == pattern.subgroups_.size());
+  for (auto& expected : pattern.subgroups_) {
+    auto subgroup = findByName(simple_subgroups, expected.name_);
+    REQUIRE(subgroup);
+    verifyProcessGroup(*subgroup, expected);
+  }
+}
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
new file mode 100644
index 0000000..8e71fa0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "YamlConfiguration.h"
+#include "Utils.h"
+#include "IntegrationTestUtils.h"
+#include "ProcessGroupTestUtils.h"
+
+static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
+
+TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser1]") {
+  auto pattern = Group("root")
+    .With({
+      Conn{"Conn1",
+           Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+           Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}},
+      Conn{"Conn2",
+           Proc{"00000000-0000-0000-0000-000000000002", "Proc2"},
+           Proc{"00000000-0000-0000-0000-000000000005", "Port1"}}
+    }).With({
+      Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+      Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}
+    }).With({
+      RPG{"RPG1", {
+        Proc{"00000000-0000-0000-0000-000000000005", "Port1"},
+        Proc{"00000000-0000-0000-0000-000000000006", "Port2"}
+      }},
+      RPG{"RPG2", {}}
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser2]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+           Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}})
+    .With({
+      Group("Child1")
+      .With({Conn{"Child1_Conn1",
+                  Proc{"00000000-0000-0000-0000-000000000005", "Port1"},
+                  Proc{"00000000-0000-0000-0000-000000000007", "Child1_Proc2"}}})
+      .With({Proc{"00000000-0000-0000-0000-000000000006", "Child1_Proc1"},
+             Proc{"00000000-0000-0000-0000-000000000007", "Child1_Proc2"}})
+      .With({RPG{"Child1_RPG1", {
+        Proc{"00000000-0000-0000-0000-000000000005", "Port1"}}}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser3]") {
+  TestController controller;
+  LogTestController::getInstance().setTrace<core::YamlConfiguration>();
+  Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
+  Proc Port1{"00000000-0000-0000-0000-000000000002", "Port1"};
+  Proc Child1_Proc1{"00000000-0000-0000-0000-000000000011", "Child1_Proc1"};
+  Proc Child1_Port1{"00000000-0000-0000-0000-000000000012", "Child1_Port1"};
+  Proc Child2_Proc1{"00000000-0000-0000-0000-000000000021", "Child2_Proc1"};
+  Proc Child2_Port1{"00000000-0000-0000-0000-000000000022", "Child2_Port1"};
+
+  auto pattern = Group("root")
+    .With({Proc1})
+    .With({Conn{"Conn1", Proc1, Port1}})
+    .With({RPG{"RPG1", {Port1}}})
+    .With({
+      Group("Child1").With({Child1_Proc1})
+      .With({Conn{"Child1_Conn1", Child1_Proc1, Child1_Port1}})
+      .With({RPG{"Child1_RPG1", {Child1_Port1}}}),
+      Group("Child2")
+      .With({Child2_Proc1})
+      .With({RPG{"Child2_RPG1", {Child2_Port1}}})
+    });
+
+  auto& Conn1 = pattern.connections_.at(0);
+  auto& Child1_Conn1 = pattern.subgroups_.at(0).connections_.at(0);
+
+  SECTION("Connecting processors in their own groups") {
+    // sanity check, everything is resolved as it should
+  }
+
+  SECTION("Connecting processors in their child/parent group") {
+    Conn1.source = UnresolvedProc{Child1_Proc1.id};
+    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+
+    Child1_Conn1.source = UnresolvedProc{Proc1.id};
+    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+  }
+
+  SECTION("Connecting processors between their own and their child/parent group") {
+    Conn1.source = Proc1;
+    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+
+    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.destination = Child1_Proc1;
+  }
+
+  SECTION("Connecting processors in a sibling group") {
+    Conn1.source = Proc1;
+    Conn1.destination = Port1;
+
+    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
+    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+  }
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index efd4ac5..9d96b56 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -105,13 +105,13 @@ class FlowConfiguration : public CoreComponent {
   std::shared_ptr<core::Processor> createProcessor(const std::string &name, const std::string &fullname, utils::Identifier &uuid);
   // Create Root Processor Group
 
-  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, utils::Identifier & uuid, int version);
+  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
+  std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
+  std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid);
 
   std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name,
       const utils::Identifier& uuid);
 
-  // Create Remote Processor Group
-  std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, utils::Identifier & uuid);
   // Create Connection
   std::shared_ptr<minifi::Connection> createConnection(std::string name, const utils::Identifier& uuid) const;
   // Create Provenance Report Task
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index fad30a7..78a462a 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -40,6 +40,8 @@
 #include "utils/HTTPClient.h"
 #include "utils/CallBackTimer.h"
 
+struct ProcessGroupTestAccessor;
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -49,6 +51,7 @@ namespace core {
 // Process Group Type
 enum ProcessGroupType {
   ROOT_PROCESS_GROUP = 0,
+  SIMPLE_PROCESS_GROUP,
   REMOTE_PROCESS_GROUP,
   MAX_PROCESS_GROUP_TYPE
 };
@@ -57,7 +60,12 @@ enum ProcessGroupType {
 
 // ProcessGroup Class
 class ProcessGroup : public CoreComponent {
+  friend struct ::ProcessGroupTestAccessor;
  public:
+  enum class Traverse {
+    ExcludeChildren,
+    IncludeChildren
+  };
   // Constructor
   /*!
    * Create a new process group
@@ -159,6 +167,8 @@ class ProcessGroup : public CoreComponent {
   void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter = [] (const std::shared_ptr<Processor>&) {return true;}); // NOLINT
   // Whether it is root process group
   bool isRootProcessGroup();
+
+  bool isRemoteProcessGroup();
   // set parent process group
   void setParent(ProcessGroup *parent) {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -174,31 +184,31 @@ class ProcessGroup : public CoreComponent {
   // Remove processor
   void removeProcessor(const std::shared_ptr<Processor>& processor);
   // Add child processor group
-  void addProcessGroup(ProcessGroup *child);
-  // Remove child processor group
-  void removeProcessGroup(ProcessGroup *child);
+  void addProcessGroup(std::unique_ptr<ProcessGroup> child);
   // ! Add connections
   void addConnection(const std::shared_ptr<Connection>& connection);
   // Generic find
   template <typename Fun>
-  std::shared_ptr<Processor> findProcessor(Fun condition) const {
+  std::shared_ptr<Processor> findProcessor(Fun condition, Traverse traverse) const {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     const auto found = std::find_if(processors_.cbegin(), processors_.cend(), condition);
     if (found != processors_.cend()) {
       return *found;
     }
     for (const auto& processGroup : child_process_groups_) {
-      const std::shared_ptr<Processor> processor = processGroup->findProcessor(condition);
-      if (processor) {
-        return processor;
+      if (processGroup->isRemoteProcessGroup() || traverse == Traverse::IncludeChildren) {
+        std::shared_ptr<Processor> processor = processGroup->findProcessor(condition, traverse);
+        if (processor) {
+          return processor;
+        }
       }
     }
     return nullptr;
   }
   // findProcessor based on UUID
-  std::shared_ptr<Processor> findProcessorById(const utils::Identifier& uuid) const;
+  std::shared_ptr<Processor> findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
   // findProcessor based on name
-  std::shared_ptr<Processor> findProcessorByName(const std::string &processorName) const;
+  std::shared_ptr<Processor> findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;
 
   void getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec);
   /**
@@ -236,13 +246,13 @@ class ProcessGroup : public CoreComponent {
   // version
   int config_version_;
   // Process Group Type
-  ProcessGroupType type_;
+  const ProcessGroupType type_;
   // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
-  std::set<std::shared_ptr<Processor> > processors_;
-  std::set<std::shared_ptr<Processor> > failed_processors_;
-  std::set<ProcessGroup *> child_process_groups_;
+  std::set<std::shared_ptr<Processor>> processors_;
+  std::set<std::shared_ptr<Processor>> failed_processors_;
+  std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
   // Connections between the processor inside the group;
-  std::set<std::shared_ptr<Connection> > connections_;
+  std::set<std::shared_ptr<Connection>> connections_;
   // Parent Process Group
   ProcessGroup* parent_process_group_;
   // Yield Period in Milliseconds
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index aafbeb8..767e7fd 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -83,7 +83,7 @@ class YamlConfiguration : public FlowConfiguration {
     }
     try {
       YAML::Node rootYamlNode = YAML::Load(configuration.value());
-      return getYamlRoot(&rootYamlNode);
+      return getYamlRoot(rootYamlNode);
     } catch(...) {
       logger_->log_error("Invalid yaml configuration file");
       throw;
@@ -104,7 +104,7 @@ class YamlConfiguration : public FlowConfiguration {
   std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream) {
     try {
       YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
-      return getYamlRoot(&rootYamlNode);
+      return getYamlRoot(rootYamlNode);
     } catch (const YAML::ParserException &pe) {
       logger_->log_error(pe.what());
       std::rethrow_exception(std::current_exception());
@@ -126,7 +126,7 @@ class YamlConfiguration : public FlowConfiguration {
   std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override {
     try {
       YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
-      return getYamlRoot(&rootYamlNode);
+      return getYamlRoot(rootYamlNode);
     } catch (const YAML::ParserException &pe) {
       logger_->log_error(pe.what());
       std::rethrow_exception(std::current_exception());
@@ -156,8 +156,11 @@ class YamlConfiguration : public FlowConfiguration {
    * @return             the root ProcessGroup node of the flow
    *                       configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode);
+  std::unique_ptr<core::ProcessGroup> getYamlRoot(const YAML::Node& rootYamlNode);
 
+  std::unique_ptr<core::ProcessGroup> createProcessGroup(const YAML::Node& headerNode, bool is_root = false);
+
+  std::unique_ptr<core::ProcessGroup> parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root = false);
   /**
    * Parses a processor from its corresponding YAML config node and adds
    * it to a parent ProcessGroup. The processorNode argument must point
@@ -169,7 +172,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parent        the parent ProcessGroup to which the the created
    *                        Processor should be added
    */
-  void parseProcessorNodeYaml(YAML::Node processorNode, core::ProcessGroup *parent);
+  void parseProcessorNodeYaml(const YAML::Node& processorNode, core::ProcessGroup* parent);
 
   /**
    * Parses a port from its corressponding YAML config node and adds
@@ -182,7 +185,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parent    the parent ProcessGroup for the port
    * @param direction the TransferDirection of the port
    */
-  void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction);
+  void parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
 
   /**
    * Parses the root level YAML node for the flow configuration and
@@ -192,16 +195,16 @@ class YamlConfiguration : public FlowConfiguration {
    * @param rootNode
    * @return
    */
-  std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(YAML::Node rootNode);
+  std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(const YAML::Node& rootNode);
 
   // Process Property YAML
-  void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, std::shared_ptr<core::Processor> processor);
+  void parseProcessorPropertyYaml(const YAML::Node& doc, const YAML::Node& node, std::shared_ptr<core::Processor> processor);
   /**
    * Parse controller services
    * @param controllerServicesNode controller services YAML node.
    * @param parent parent process group.
    */
-  void parseControllerServices(YAML::Node *controllerServicesNode);
+  void parseControllerServices(const YAML::Node& controllerServicesNode);
 
   /**
    * Parses the Connections section of a configuration YAML.
@@ -212,7 +215,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parent the root node of flow configuration to which
    *                 to add the connections that are parsed
    */
-  void parseConnectionYaml(YAML::Node *node, core::ProcessGroup *parent);
+  void parseConnectionYaml(const YAML::Node& node, core::ProcessGroup* parent);
 
   /**
    * Parses the Remote Process Group section of a configuration YAML.
@@ -223,7 +226,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parent the root node of flow configuration to which
    *                 to add the process groups that are parsed
    */
-  void parseRemoteProcessGroupYaml(YAML::Node *node, core::ProcessGroup *parent);
+  void parseRemoteProcessGroupYaml(const YAML::Node& node, core::ProcessGroup* parent);
 
   /**
    * Parses the Provenance Reporting section of a configuration YAML.
@@ -235,7 +238,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parentGroup the root node of flow configuration to which
    *                      to add the provenance reporting config
    */
-  void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup);
+  void parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup);
 
   /**
    * A helper function to parse the Properties Node YAML for a processor.
@@ -243,7 +246,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param propertiesNode the YAML::Node containing the properties
    * @param processor      the Processor to which to add the resulting properties
    */
-  void parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string &component_name, const std::string &yaml_section);
+  void parsePropertiesNodeYaml(const YAML::Node& propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string& component_name, const std::string& yaml_section);
 
   /**
    * A helper function for parsing or generating optional id fields.
@@ -261,7 +264,7 @@ class YamlConfiguration : public FlowConfiguration {
    *                   is optional and defaults to 'id'
    * @return         the parsed or generated UUID string
    */
-  std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = "id");
+  std::string getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField = "id");
 
   /**
    * This is a helper function for getting an optional value, if it exists.
@@ -277,7 +280,7 @@ class YamlConfiguration : public FlowConfiguration {
    *                       the optional field is missing. If not provided,
    *                       a default info message will be generated.
    */
-  YAML::Node getOptionalField(YAML::Node *yamlNode, const std::string &fieldName, const YAML::Node &defaultValue, const std::string &yamlSection = "", const std::string &infoMessage = "");
+  YAML::Node getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection = "", const std::string& infoMessage = "");
 
  protected:
   std::shared_ptr<io::StreamFactory> stream_factory_;
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 232002d..1374205 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -42,7 +42,7 @@ FlowConfiguration::~FlowConfiguration() {
   }
 }
 
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier &uuid) {
   auto processor = minifi::processors::ProcessorUtils::createProcessor(name, name, uuid, stream_factory_);
   if (nullptr == processor) {
     logger_->log_error("No Processor defined for %s", name);
@@ -51,7 +51,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string
   return processor;
 }
 
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(const std::string &name, const std::string &fullname, utils::Identifier & uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(const std::string &name, const std::string &fullname, utils::Identifier &uuid) {
   auto processor = minifi::processors::ProcessorUtils::createProcessor(name, fullname, uuid, stream_factory_);
   if (nullptr == processor) {
     logger_->log_error("No Processor defined for %s", fullname);
@@ -112,12 +112,16 @@ bool FlowConfiguration::persist(const std::string &configuration) {
   return filesystem_->write(*config_path_, configuration);
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, utils::Identifier & uuid, int version) {
-  return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(const std::string &name, const utils::Identifier &uuid, int version) {
+  return utils::make_unique<core::ProcessGroup>(core::ROOT_PROCESS_GROUP, name, uuid, version);
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, utils::Identifier & uuid) {
-  return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version) {
+  return utils::make_unique<core::ProcessGroup>(core::SIMPLE_PROCESS_GROUP, name, uuid, version);
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid) {
+  return utils::make_unique<core::ProcessGroup>(core::REMOTE_PROCESS_GROUP, name, uuid);
 }
 
 std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, const utils::Identifier& uuid) const {
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index add7865..8183d0b 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -87,17 +87,17 @@ ProcessGroup::~ProcessGroup() {
   for (auto&& connection : connections_) {
     connection->drain(false);
   }
-
-  for (ProcessGroup* childGroup : child_process_groups_) {
-    delete childGroup;
-  }
 }
 
 bool ProcessGroup::isRootProcessGroup() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
   return (type_ == ROOT_PROCESS_GROUP);
 }
 
+bool ProcessGroup::isRemoteProcessGroup() {
+  return (type_ == REMOTE_PROCESS_GROUP);
+}
+
+
 void ProcessGroup::addProcessor(const std::shared_ptr<Processor>& processor) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
@@ -118,23 +118,13 @@ void ProcessGroup::removeProcessor(const std::shared_ptr<Processor>& processor)
   }
 }
 
-void ProcessGroup::addProcessGroup(ProcessGroup *child) {
+void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (child_process_groups_.find(child) == child_process_groups_.end()) {
     // We do not have the same child process group in this process group yet
-    child_process_groups_.insert(child);
     logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_);
-  }
-}
-
-void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-
-  if (child_process_groups_.find(child) != child_process_groups_.end()) {
-    // We do have the same child process group in this process group yet
-    child_process_groups_.erase(child);
-    logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_);
+    child_process_groups_.emplace(std::move(child));
   }
 }
 
@@ -201,7 +191,7 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg
     startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
 
     // Start processing the group
-    for (auto processGroup : child_process_groups_) {
+    for (auto& processGroup : child_process_groups_) {
       processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
     }
   } catch (std::exception &exception) {
@@ -243,7 +233,7 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
       }
     }
 
-    for (ProcessGroup* childGroup : child_process_groups_) {
+    for (auto& childGroup : child_process_groups_) {
       childGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler, filter);
     }
   } catch (std::exception &exception) {
@@ -255,21 +245,21 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
   }
 }
 
-std::shared_ptr<Processor> ProcessGroup::findProcessorById(const utils::Identifier& uuid) const {
+std::shared_ptr<Processor> ProcessGroup::findProcessorById(const utils::Identifier& uuid, Traverse traverse) const {
   const auto id_matches = [&] (const std::shared_ptr<Processor>& processor) {
-    logger_->log_debug("Current processor is %s", processor->getName());
+    logger_->log_trace("Searching for processor by id, checking processor %s", processor->getName());
     utils::Identifier processorUUID = processor->getUUID();
     return processorUUID && uuid == processorUUID;
   };
-  return findProcessor(id_matches);
+  return findProcessor(id_matches, traverse);
 }
 
-std::shared_ptr<Processor> ProcessGroup::findProcessorByName(const std::string &processorName) const {
+std::shared_ptr<Processor> ProcessGroup::findProcessorByName(const std::string &processorName, Traverse traverse) const {
   const auto name_matches = [&] (const std::shared_ptr<Processor>& processor) {
-    logger_->log_debug("Current processor is %s", processor->getName());
+    logger_->log_trace("Searching for processor by name, checking processor %s", processor->getName());
     return processor->getName() == processorName;
   };
-  return findProcessor(name_matches);
+  return findProcessor(name_matches, traverse);
 }
 
 void ProcessGroup::addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node) {
@@ -287,59 +277,58 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
 
 void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
-  std::shared_ptr<Processor> ret = NULL;
 
-  for (auto processor : processors_) {
-    logger_->log_debug("Current processor is %s", processor->getName());
+  for (auto& processor : processors_) {
+    logger_->log_trace("Collecting all processors, current processor is %s", processor->getName());
     processor_vec.push_back(processor);
   }
-  for (auto processGroup : child_process_groups_) {
+  for (auto& processGroup : child_process_groups_) {
     processGroup->getAllProcessors(processor_vec);
   }
 }
 
 void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
-  for (auto processor : processors_) {
+  for (auto& processor : processors_) {
     if (processor->getName() == processorName) {
       processor->setProperty(propertyName, propertyValue);
     }
   }
-  for (auto processGroup : child_process_groups_) {
+  for (auto& processGroup : child_process_groups_) {
     processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
   }
 }
 
 void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
-  for (auto connection : connections_) {
+  for (auto& connection : connections_) {
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
-  for (auto processGroup : child_process_groups_) {
+  for (auto& processGroup : child_process_groups_) {
     processGroup->getConnections(connectionMap);
   }
 }
 
 void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) {
-  for (auto connection : connections_) {
+  for (auto& connection : connections_) {
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
-  for (auto processGroup : child_process_groups_) {
+  for (auto& processGroup : child_process_groups_) {
     processGroup->getConnections(connectionMap);
   }
 }
 
 void ProcessGroup::getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const {
-  for (auto connection : connections_) {
+  for (auto& connection : connections_) {
     containers[connection->getUUIDStr()] = connection;
     containers[connection->getName()] = connection;
   }
-  for (auto processor : processors_) {
+  for (auto& processor : processors_) {
     // processors can also own FlowFiles
     containers[processor->getUUIDStr()] = processor;
   }
-  for (auto processGroup : child_process_groups_) {
+  for (auto& processGroup : child_process_groups_) {
     processGroup->getFlowFileContainers(containers);
   }
 }
@@ -351,11 +340,19 @@ void ProcessGroup::addConnection(const std::shared_ptr<Connection>& connection)
     // We do not have the same connection in this process group yet
     connections_.insert(connection);
     logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
-    std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID());
+    // only allow connections between processors of the same process group
+    std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID(), Traverse::ExcludeChildren);
     if (source) {
       source->addConnection(connection);
+    } else {
+      logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
+                         connection->getSourceUUID().to_string(), connection->getName(), connection->getUUIDStr());
+    }
+    std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID(), Traverse::ExcludeChildren);
+    if (!destination) {
+      logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+                         connection->getDestinationUUID().to_string(), connection->getName(), connection->getUUIDStr());
     }
-    std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID());
     if (destination && destination != source) {
       destination->addConnection(connection);
     }
@@ -383,18 +380,18 @@ void ProcessGroup::drainConnections() {
     connection->drain(false);
   }
 
-  for (ProcessGroup* childGroup : child_process_groups_) {
+  for (auto& childGroup : child_process_groups_) {
     childGroup->drainConnections();
   }
 }
 
 std::size_t ProcessGroup::getTotalFlowFileCount() const {
   std::size_t sum = 0;
-  for (auto& conn : connections_) {
+  for (const auto& conn : connections_) {
     sum += gsl::narrow<std::size_t>(conn->getQueueSize());
   }
 
-  for (const ProcessGroup* childGroup : child_process_groups_) {
+  for (const auto& childGroup : child_process_groups_) {
     sum += childGroup->getTotalFlowFileCount();
   }
   return sum;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 67d094b..38afd40 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -48,36 +48,46 @@ YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& re
       stream_factory_(stream_factory),
       logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-  utils::Identifier uuid;
-  int version = 0;
-
-  yaml::checkRequiredField(&rootFlowNode, "name", logger_,
-  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-  std::string flowName = rootFlowNode["name"].as<std::string>();
-
-  auto class_loader_functions = rootFlowNode["Class Loader Functions"];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
+  auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+  auto class_loader_functions = flowControllerNode["Class Loader Functions"];
   if (class_loader_functions && class_loader_functions.IsSequence()) {
     for (auto function : class_loader_functions) {
       registerResource(function.as<std::string>());
     }
   }
 
-  std::string id = getOrGenerateId(&rootFlowNode);
-  uuid = id;
+  auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
+  this->name_ = rootGroup->getName();
+  return rootGroup;
+}
 
-  if (rootFlowNode["version"]) {
-    version = rootFlowNode["version"].as<int>();
-  }
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
+  int version = 0;
 
-  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
-  std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+  yaml::checkRequiredField(&yamlNode, "name", logger_,
+  CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+  std::string flowName = yamlNode["name"].as<std::string>();
 
-  this->name_ = flowName;
+  utils::Identifier uuid;
+  // assignment throws on invalid uuid
+  uuid = getOrGenerateId(yamlNode);
 
-  if (rootFlowNode["onschedule retry interval"]) {
+  if (yamlNode["version"]) {
+    version = yamlNode["version"].as<int>();
+  }
+
+  logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
+  std::unique_ptr<core::ProcessGroup> group;
+  if (is_root) {
+    group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+  } else {
+    group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
+  }
+
+  if (yamlNode["onschedule retry interval"]) {
     int64_t onScheduleRetryPeriodValue = -1;
-    std::string onScheduleRetryPeriod = rootFlowNode["onschedule retry interval"].as<std::string>();
+    std::string onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
     logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
 
     core::TimeUnit unit;
@@ -93,27 +103,43 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml
   return group;
 }
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *rootYamlNode) {
-    YAML::Node rootYaml = *rootYamlNode;
-    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
-    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
-    YAML::Node connectionsNode = rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
-    YAML::Node controllerServiceNode = rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
-    YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-
-    if (!remoteProcessingGroupsNode) {
-      remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
+  auto group = createProcessGroup(headerNode, is_root);
+  YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
+  YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+  YAML::Node remoteProcessingGroupsNode = [&] {
+    // assignment is not supported on invalid Yaml nodes
+    YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+    if (candidate) {
+      return candidate;
     }
+    return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+  }();
+  YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
+
+  parseProcessorNodeYaml(processorsNode, group.get());
+  parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
+  // parse connections last to give feedback if the source and/or destination
+  // is not in the same process group
+  parseConnectionYaml(connectionsNode, group.get());
+
+  if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
+    for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
+      YAML::Node childProcessGroupNode = it->as<YAML::Node>();
+      group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
+    }
+  }
+  return group;
+}
 
-    YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
+    YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+    YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
 
-    parseControllerServices(&controllerServiceNode);
+    parseControllerServices(controllerServiceNode);
     // Create the root process group
-    std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(flowControllerNode);
-    parseProcessorNodeYaml(processorsNode, root.get());
-    parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root.get());
-    parseConnectionYaml(&connectionsNode, root.get());
-    parseProvenanceReportingYaml(&provenanceReportNode, root.get());
+    std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
+    parseProvenanceReportingYaml(provenanceReportNode, root.get());
 
     // set the controller services into the root group.
     for (const auto& controller_service : controller_services_->getAllControllerServices()) {
@@ -124,7 +150,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *r
     return root;
   }
 
-void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
   int64_t schedulingPeriod = -1;
   int64_t penalizationPeriod = -1;
   int64_t yieldPeriod = -1;
@@ -137,170 +163,169 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
     return;
   }
 
-  if (processorsNode) {
-    if (processorsNode.IsSequence()) {
-      // Evaluate sequence of processors
-      for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
-        core::ProcessorConfig procCfg;
-        YAML::Node procNode = iter->as<YAML::Node>();
-
-        yaml::checkRequiredField(&procNode, "name", logger_,
-        CONFIG_YAML_PROCESSORS_KEY);
-        procCfg.name = procNode["name"].as<std::string>();
-        procCfg.id = getOrGenerateId(&procNode);
-
-        auto lib_location = procNode["Library Location"];
-        auto lib_function = procNode["Library Function"];
-        if (lib_location && lib_function) {
-          auto lib_location_str = lib_location.as<std::string>();
-          auto lib_function_str = lib_function.as<std::string>();
-          registerResource(lib_location_str, lib_function_str);
-        }
-
-        uuid = procCfg.id.c_str();
-        logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
-        yaml::checkRequiredField(&procNode, "class", logger_, CONFIG_YAML_PROCESSORS_KEY);
-        procCfg.javaClass = procNode["class"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
-
-        // Determine the processor name only from the Java class
-        auto lastOfIdx = procCfg.javaClass.find_last_of(".");
-        if (lastOfIdx != std::string::npos) {
-          lastOfIdx++;  // if a value is found, increment to move beyond the .
-          std::string processorName = procCfg.javaClass.substr(lastOfIdx);
-          processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
-        } else {
-          // Allow unqualified class names for core processors
-          processor = this->createProcessor(procCfg.javaClass, uuid);
-        }
+  if (!processorsNode) {
+    throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+  }
+  if (!processorsNode.IsSequence()) {
+    throw std::invalid_argument(
+        "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+  }
+  // Evaluate sequence of processors
+  for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+    core::ProcessorConfig procCfg;
+    YAML::Node procNode = iter->as<YAML::Node>();
+
+    yaml::checkRequiredField(&procNode, "name", logger_,
+    CONFIG_YAML_PROCESSORS_KEY);
+    procCfg.name = procNode["name"].as<std::string>();
+    procCfg.id = getOrGenerateId(procNode);
+
+    auto lib_location = procNode["Library Location"];
+    auto lib_function = procNode["Library Function"];
+    if (lib_location && lib_function) {
+      auto lib_location_str = lib_location.as<std::string>();
+      auto lib_function_str = lib_function.as<std::string>();
+      registerResource(lib_location_str, lib_function_str);
+    }
 
-        if (!processor) {
-          logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
-          throw std::invalid_argument("Could not create processor " + procCfg.name);
-        }
+    uuid = procCfg.id.c_str();
+    logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
+    yaml::checkRequiredField(&procNode, "class", logger_, CONFIG_YAML_PROCESSORS_KEY);
+    procCfg.javaClass = procNode["class"].as<std::string>();
+    logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
+
+    // Determine the processor name only from the Java class
+    auto lastOfIdx = procCfg.javaClass.find_last_of(".");
+    if (lastOfIdx != std::string::npos) {
+      lastOfIdx++;  // if a value is found, increment to move beyond the .
+      std::string processorName = procCfg.javaClass.substr(lastOfIdx);
+      processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
+    } else {
+      // Allow unqualified class names for core processors
+      processor = this->createProcessor(procCfg.javaClass, uuid);
+    }
 
-        processor->setName(procCfg.name);
+    if (!processor) {
+      logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
+      throw std::invalid_argument("Could not create processor " + procCfg.name);
+    }
 
-        processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
+    processor->setName(procCfg.name);
 
-        auto strategyNode = getOptionalField(&procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
-        CONFIG_YAML_PROCESSORS_KEY);
-        procCfg.schedulingStrategy = strategyNode.as<std::string>();
-        logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
+    processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
 
-        auto periodNode = getOptionalField(&procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
-        CONFIG_YAML_PROCESSORS_KEY);
+    auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
+    CONFIG_YAML_PROCESSORS_KEY);
+    procCfg.schedulingStrategy = strategyNode.as<std::string>();
+    logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
-        procCfg.schedulingPeriod = periodNode.as<std::string>();
-        logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
+    auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
+    CONFIG_YAML_PROCESSORS_KEY);
 
-        if (procNode["max concurrent tasks"]) {
-          procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
-          logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
-        }
+    procCfg.schedulingPeriod = periodNode.as<std::string>();
+    logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
-        if (procNode["penalization period"]) {
-          procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
-          logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
-        }
+    if (procNode["max concurrent tasks"]) {
+      procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+      logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
+    }
 
-        if (procNode["yield period"]) {
-          procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
-          logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
-        }
+    if (procNode["penalization period"]) {
+      procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+      logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
+    }
 
-        if (procNode["run duration nanos"]) {
-          procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
-          logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
-        }
+    if (procNode["yield period"]) {
+      procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+      logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
+    }
 
-        // handle auto-terminated relationships
-        if (procNode["auto-terminated relationships list"]) {
-          YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
-          std::vector<std::string> rawAutoTerminatedRelationshipValues;
-          if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
-            for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
-              std::string autoTerminatedRel = relIter->as<std::string>();
-              rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
-            }
-          }
-          procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
-        }
+    if (procNode["run duration nanos"]) {
+      procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
+      logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
+    }
 
-        // handle processor properties
-        if (procNode["Properties"]) {
-          YAML::Node propertiesNode = procNode["Properties"];
-          parsePropertiesNodeYaml(&propertiesNode, processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+    // handle auto-terminated relationships
+    if (procNode["auto-terminated relationships list"]) {
+      YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+      std::vector<std::string> rawAutoTerminatedRelationshipValues;
+      if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
+        for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
+          std::string autoTerminatedRel = relIter->as<std::string>();
+          rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
         }
+      }
+      procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
+    }
 
-        // Take care of scheduling
+    // handle processor properties
+    if (procNode["Properties"]) {
+      YAML::Node propertiesNode = procNode["Properties"];
+      parsePropertiesNodeYaml(propertiesNode, processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+    }
 
-        core::TimeUnit unit;
+    // Take care of scheduling
 
-        if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-          if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-            logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", schedulingPeriod);
-            processor->setSchedulingPeriodNano(schedulingPeriod);
-          }
-        } else {
-          processor->setCronPeriod(procCfg.schedulingPeriod);
-        }
+    core::TimeUnit unit;
 
-        if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
-          logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
-          processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
-        }
+    if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+      if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+        logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", schedulingPeriod);
+        processor->setSchedulingPeriodNano(schedulingPeriod);
+      }
+    } else {
+      processor->setCronPeriod(procCfg.schedulingPeriod);
+    }
 
-        if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
-          logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yieldPeriod);
-          processor->setYieldPeriodMsec(yieldPeriod);
-        }
+    if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+      logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
+      processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
+    }
 
-        // Default to running
-        processor->setScheduledState(core::RUNNING);
-
-        if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
-          processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-        } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-          processor->setSchedulingStrategy(core::EVENT_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-        } else {
-          processor->setSchedulingStrategy(core::CRON_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
-        }
+    if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+      logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yieldPeriod);
+      processor->setYieldPeriodMsec(yieldPeriod);
+    }
 
-        int32_t maxConcurrentTasks;
-        if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
-          logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
-          processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
-        }
+    // Default to running
+    processor->setScheduledState(core::RUNNING);
 
-        if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
-          logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
-          processor->setRunDurationNano((uint64_t) runDurationNanos);
-        }
+    if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+      processor->setSchedulingStrategy(core::TIMER_DRIVEN);
+      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+    } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+      processor->setSchedulingStrategy(core::EVENT_DRIVEN);
+      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+    } else {
+      processor->setSchedulingStrategy(core::CRON_DRIVEN);
+      logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+    }
 
-        std::set<core::Relationship> autoTerminatedRelationships;
-        for (auto &&relString : procCfg.autoTerminatedRelationships) {
-          core::Relationship relationship(relString, "");
-          logger_->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString);
-          autoTerminatedRelationships.insert(relationship);
-        }
+    int32_t maxConcurrentTasks;
+    if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+      logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+      processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
+    }
 
-        processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+    if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+      logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+      processor->setRunDurationNano((uint64_t) runDurationNanos);
+    }
 
-        parentGroup->addProcessor(processor);
-      }
-    } else {
-      throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+    std::set<core::Relationship> autoTerminatedRelationships;
+    for (auto &&relString : procCfg.autoTerminatedRelationships) {
+      core::Relationship relationship(relString, "");
+      logger_->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString);
+      autoTerminatedRelationships.insert(relationship);
     }
-  } else {
-    throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+
+    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+    parentGroup->addProcessor(processor);
   }
 }
 
-void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
   utils::Identifier uuid;
   std::string id;
 
@@ -309,125 +334,123 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
     return;
   }
 
-  if (rpgNode) {
-    if (rpgNode->IsSequence()) {
-      for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
-        YAML::Node currRpgNode = iter->as<YAML::Node>();
+  if (!rpgNode || !rpgNode.IsSequence()) {
+    return;
+  }
+  for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
+    YAML::Node currRpgNode = iter->as<YAML::Node>();
 
-        yaml::checkRequiredField(&currRpgNode, "name", logger_,
-        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-        auto name = currRpgNode["name"].as<std::string>();
-        id = getOrGenerateId(&currRpgNode);
+    yaml::checkRequiredField(&currRpgNode, "name", logger_,
+    CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+    auto name = currRpgNode["name"].as<std::string>();
+    id = getOrGenerateId(currRpgNode);
 
-        logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+    logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
 
-        auto urlNode = getOptionalField(&currRpgNode, "url", YAML::Node(""),
-        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+    auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
+    CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
 
-        std::string url = urlNode.as<std::string>();
-        logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
+    std::string url = urlNode.as<std::string>();
+    logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
 
-        core::ProcessGroup *group = NULL;
-        core::TimeUnit unit;
-        int64_t timeoutValue = -1;
-        int64_t yieldPeriodValue = -1;
-        uuid = id;
-        group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
-        group->setParent(parentGroup);
-        parentGroup->addProcessGroup(group);
+    core::TimeUnit unit;
+    int64_t timeoutValue = -1;
+    int64_t yieldPeriodValue = -1;
+    uuid = id;
+    auto group = this->createRemoteProcessGroup(name, uuid);
+    group->setParent(parentGroup);
+
+    if (currRpgNode["yield period"]) {
+      std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
+      logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+
+      if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+        logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yieldPeriodValue);
+        group->setYieldPeriodMsec(yieldPeriodValue);
+      }
+    }
 
-        if (currRpgNode["yield period"]) {
-          std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+    if (currRpgNode["timeout"]) {
+      std::string timeout = currRpgNode["timeout"].as<std::string>();
+      logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
 
-          if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
-            logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yieldPeriodValue);
-            group->setYieldPeriodMsec(yieldPeriodValue);
-          }
-        }
+      if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+        logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeoutValue);
+        group->setTimeOut(timeoutValue);
+      }
+    }
 
-        if (currRpgNode["timeout"]) {
-          std::string timeout = currRpgNode["timeout"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+    if (currRpgNode["local network interface"]) {
+      std::string interface = currRpgNode["local network interface"].as<std::string>();
+      logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
+      group->setInterface(interface);
+    }
 
-          if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
-            logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeoutValue);
-            group->setTimeOut(timeoutValue);
+    if (currRpgNode["transport protocol"]) {
+      std::string transport_protocol = currRpgNode["transport protocol"].as<std::string>();
+      logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
+      if (transport_protocol == "HTTP") {
+        group->setTransportProtocol(transport_protocol);
+        if (currRpgNode["proxy host"]) {
+          std::string http_proxy_host = currRpgNode["proxy host"].as<std::string>();
+          logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
+          group->setHttpProxyHost(http_proxy_host);
+          if (currRpgNode["proxy user"]) {
+            std::string http_proxy_username = currRpgNode["proxy user"].as<std::string>();
+            logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
+            group->setHttpProxyUserName(http_proxy_username);
           }
-        }
-
-        if (currRpgNode["local network interface"]) {
-          std::string interface = currRpgNode["local network interface"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
-          group->setInterface(interface);
-        }
-
-        if (currRpgNode["transport protocol"]) {
-          std::string transport_protocol = currRpgNode["transport protocol"].as<std::string>();
-          logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
-          if (transport_protocol == "HTTP") {
-            group->setTransportProtocol(transport_protocol);
-            if (currRpgNode["proxy host"]) {
-              std::string http_proxy_host = currRpgNode["proxy host"].as<std::string>();
-              logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
-              group->setHttpProxyHost(http_proxy_host);
-              if (currRpgNode["proxy user"]) {
-                std::string http_proxy_username = currRpgNode["proxy user"].as<std::string>();
-                logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
-                group->setHttpProxyUserName(http_proxy_username);
-              }
-              if (currRpgNode["proxy password"]) {
-                std::string http_proxy_password = currRpgNode["proxy password"].as<std::string>();
-                logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
-                group->setHttpProxyPassWord(http_proxy_password);
-              }
-              if (currRpgNode["proxy port"]) {
-                std::string http_proxy_port = currRpgNode["proxy port"].as<std::string>();
-                int32_t port;
-                if (core::Property::StringToInt(http_proxy_port, port)) {
-                  logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
-                  group->setHttpProxyPort(port);
-                }
-              }
+          if (currRpgNode["proxy password"]) {
+            std::string http_proxy_password = currRpgNode["proxy password"].as<std::string>();
+            logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
+            group->setHttpProxyPassWord(http_proxy_password);
+          }
+          if (currRpgNode["proxy port"]) {
+            std::string http_proxy_port = currRpgNode["proxy port"].as<std::string>();
+            int32_t port;
+            if (core::Property::StringToInt(http_proxy_port, port)) {
+              logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
+              group->setHttpProxyPort(port);
             }
-          } else if (transport_protocol == "RAW") {
-            group->setTransportProtocol(transport_protocol);
-          } else {
-            std::stringstream stream;
-            stream << "Invalid transport protocol " << transport_protocol;
-            throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
           }
         }
+      } else if (transport_protocol == "RAW") {
+        group->setTransportProtocol(transport_protocol);
+      } else {
+        std::stringstream stream;
+        stream << "Invalid transport protocol " << transport_protocol;
+        throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
+      }
+    }
 
-        group->setTransmitting(true);
-        group->setURL(url);
+    group->setTransmitting(true);
+    group->setURL(url);
 
-        yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
-        CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-        YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
-        if (inputPorts && inputPorts.IsSequence()) {
-          for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-            YAML::Node currPort = portIter->as<YAML::Node>();
+    yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
+    CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+    YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
+    if (inputPorts && inputPorts.IsSequence()) {
+      for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+        YAML::Node currPort = portIter->as<YAML::Node>();
 
-            this->parsePortYaml(&currPort, group, sitetosite::SEND);
-          }  // for node
-        }
-        YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
-        if (outputPorts && outputPorts.IsSequence()) {
-          for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
-            logger_->log_debug("Got a current port, iterating...");
+        this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
+      }  // for node
+    }
+    YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
+    if (outputPorts && outputPorts.IsSequence()) {
+      for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
+        logger_->log_debug("Got a current port, iterating...");
 
-            YAML::Node currPort = portIter->as<YAML::Node>();
+        YAML::Node currPort = portIter->as<YAML::Node>();
 
-            this->parsePortYaml(&currPort, group, sitetosite::RECEIVE);
-          }  // for node
-        }
-      }
+        this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
+      }  // for node
     }
+    parentGroup->addProcessGroup(std::move(group));
   }
 }
 
-void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
   utils::Identifier port_uuid;
   int64_t schedulingPeriod = -1;
 
@@ -436,7 +459,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
     return;
   }
 
-  if (!reportNode || !reportNode->IsDefined() || reportNode->IsNull()) {
+  if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
     logger_->log_debug("no provenance reporting task specified");
     return;
   }
@@ -445,7 +468,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
   processor = createProvenanceReportTask();
   std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
 
-  YAML::Node node = reportNode->as<YAML::Node>();
+  YAML::Node node = reportNode.as<YAML::Node>();
 
   yaml::checkRequiredField(&node, "scheduling strategy", logger_,
   CONFIG_YAML_PROVENANCE_REPORT_KEY);
@@ -506,78 +529,77 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
   processor->setScheduledState(core::RUNNING);
 }
 
-void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) {
-  if (!IsNullOrEmpty(controllerServicesNode)) {
-    if (controllerServicesNode->IsSequence()) {
-      for (auto iter : *controllerServicesNode) {
-        YAML::Node controllerServiceNode = iter.as<YAML::Node>();
-        try {
-          yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
-          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-          yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
-          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-          std::string type = "";
-
-          try {
-            yaml::checkRequiredField(&controllerServiceNode, "class", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-            type = controllerServiceNode["class"].as<std::string>();
-          } catch (const std::invalid_argument &) {
-            yaml::checkRequiredField(&controllerServiceNode, "type", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-            type = controllerServiceNode["type"].as<std::string>();
-            logger_->log_debug("Using type %s for controller service node", type);
-          }
-          std::string fullType = type;
-          auto lastOfIdx = type.find_last_of(".");
-          if (lastOfIdx != std::string::npos) {
-            lastOfIdx++;  // if a value is found, increment to move beyond the .
-            type = type.substr(lastOfIdx);
-          }
+void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
+  if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
+    return;
+  }
+  for (const auto& iter : controllerServicesNode) {
+    YAML::Node controllerServiceNode = iter.as<YAML::Node>();
+    try {
+      yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
+      CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+      yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
+      CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+      std::string type = "";
 
-          auto name = controllerServiceNode["name"].as<std::string>();
-          auto id = controllerServiceNode["id"].as<std::string>();
-
-          utils::Identifier uuid;
-          uuid = id;
-          auto controller_service_node = createControllerService(type, fullType, name, uuid);
-          if (nullptr != controller_service_node) {
-            logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
-            controller_service_node->initialize();
-            YAML::Node propertiesNode = controllerServiceNode["Properties"];
-            // we should propogate properties to the node and to the implementation
-            parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node), name,
-            CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-            if (controller_service_node->getControllerServiceImplementation() != nullptr) {
-              parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()), name,
-              CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-            }
-          } else {
-            logger_->log_debug("Could not locate %s", type);
-          }
-          controller_services_->put(id, controller_service_node);
-          controller_services_->put(name, controller_service_node);
-        } catch (YAML::InvalidNode &) {
-          throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
+      try {
+        yaml::checkRequiredField(&controllerServiceNode, "class", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+        type = controllerServiceNode["class"].as<std::string>();
+      } catch (const std::invalid_argument &) {
+        yaml::checkRequiredField(&controllerServiceNode, "type", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+        type = controllerServiceNode["type"].as<std::string>();
+        logger_->log_debug("Using type %s for controller service node", type);
+      }
+      std::string fullType = type;
+      auto lastOfIdx = type.find_last_of(".");
+      if (lastOfIdx != std::string::npos) {
+        lastOfIdx++;  // if a value is found, increment to move beyond the .
+        type = type.substr(lastOfIdx);
+      }
+
+      auto name = controllerServiceNode["name"].as<std::string>();
+      auto id = controllerServiceNode["id"].as<std::string>();
+
+      utils::Identifier uuid;
+      uuid = id;
+      auto controller_service_node = createControllerService(type, fullType, name, uuid);
+      if (nullptr != controller_service_node) {
+        logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
+        controller_service_node->initialize();
+        YAML::Node propertiesNode = controllerServiceNode["Properties"];
+        // we should propagate properties to the node and to the implementation
+        parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node), name,
+        CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+        if (controller_service_node->getControllerServiceImplementation() != nullptr) {
+          parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()), name,
+          CONFIG_YAML_CONTROLLER_SERVICES_KEY);
         }
+      } else {
+        logger_->log_debug("Could not locate %s", type);
       }
+      controller_services_->put(id, controller_service_node);
+      controller_services_->put(name, controller_service_node);
+    } catch (YAML::InvalidNode &) {
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
     }
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode, core::ProcessGroup* parent) {
+void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
   }
-  if (!connectionsNode || !connectionsNode->IsSequence()) {
+  if (!connectionsNode || !connectionsNode.IsSequence()) {
     return;
   }
 
-  for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+  for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
     YAML::Node connectionNode = iter->as<YAML::Node>();
     std::shared_ptr<minifi::Connection> connection = nullptr;
 
     // Configure basic connection
-    std::string id = getOrGenerateId(&connectionNode);
+    std::string id = getOrGenerateId(connectionNode);
 
     // Default name to be same as ID
     // If name is specified in configuration, use the value
@@ -604,7 +626,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode, core::P
   }
 }
 
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
   utils::Identifier uuid;
   std::shared_ptr<core::Processor> processor = NULL;
   std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
@@ -614,7 +636,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
     return;
   }
 
-  YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+  YAML::Node inputPortsObj = portNode.as<YAML::Node>();
 
   // Check for required fields
   yaml::checkRequiredField(&inputPortsObj, "name", logger_,
@@ -648,9 +670,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
   // else defaults to RAW
 
   // handle port properties
-  YAML::Node nodeVal = portNode->as<YAML::Node>();
+  YAML::Node nodeVal = portNode.as<YAML::Node>();
   YAML::Node propertiesNode = nodeVal["Properties"];
-  parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor), nameStr,
+  parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor), nameStr,
   CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
 
   // add processor to parent
@@ -669,7 +691,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
 }
 
 void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, std::shared_ptr<core::ConfigurableComponent> processor) {
-  for (auto iter : propertyValueNode) {
+  for (const auto& iter : propertyValueNode) {
     if (iter.IsDefined()) {
       YAML::Node nodeVal = iter.as<YAML::Node>();
       YAML::Node propertiesNode = nodeVal["value"];
@@ -773,11 +795,11 @@ void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName
   }
 }
 
-void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string &component_name,
-    const std::string &yaml_section) {
+void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string& component_name,
+    const std::string& yaml_section) {
   // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
   logger_->log_trace("Entered %s", component_name);
-  for (const auto propertyElem : *propertiesNode) {
+  for (const auto& propertyElem : propertiesNode) {
     const std::string propertyName = propertyElem.first.as<std::string>();
     const YAML::Node propertyValueNode = propertyElem.second;
     parsePropertyNodeElement(propertyName, propertyValueNode, processor);
@@ -869,9 +891,9 @@ void YamlConfiguration::raiseComponentError(const std::string &component_name, c
   throw std::invalid_argument(err_msg);
 }
 
-std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::string &idField) {
+std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
   std::string id;
-  YAML::Node node = yamlNode->as<YAML::Node>();
+  YAML::Node node = yamlNode.as<YAML::Node>();
 
   if (node[idField]) {
     if (YAML::NodeType::Scalar == node[idField].Type()) {
@@ -887,16 +909,16 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::
   return id;
 }
 
-YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode, const std::string &fieldName, const YAML::Node &defaultValue, const std::string &yamlSection,
-                                               const std::string &providedInfoMessage) {
+YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
+                                               const std::string& providedInfoMessage) {
   std::string infoMessage = providedInfoMessage;
-  auto result = yamlNode->as<YAML::Node>()[fieldName];
+  auto result = yamlNode.as<YAML::Node>()[fieldName];
   if (!result) {
     if (infoMessage.empty()) {
       // Build a helpful info message for the user to inform them that a default is being used
       infoMessage =
-          yamlNode->as<YAML::Node>()["name"] ?
-              "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "'" :
+          yamlNode.as<YAML::Node>()["name"] ?
+              "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
               "Using default value for optional field '" + fieldName + "' ";
       if (!yamlSection.empty()) {
         infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index cd26117..2d2b265 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -93,14 +93,14 @@ utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
   checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
   const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
   const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
-  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+  if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the source name is a remote port id, so use that as the source id
     logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
     return srcUUID.value();
   }
   // lastly, look the processor up by name
-  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
-  if (nullptr != srcProcessor) {
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName, ProcessGroup::Traverse::ExcludeChildren);
+  if (srcProcessor) {
     logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
     return srcProcessor->getUUID();
   }
@@ -126,14 +126,14 @@ utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
   checkRequiredField(&connectionNode_, "destination name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
   std::string connectionDestProcName = connectionNode_["destination name"].as<std::string>();
   const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(connectionDestProcName);
-  if (destUUID && parent_->findProcessorById(destUUID.value())) {
+  if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the destination name is a remote port id, so use that as the dest id
     logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for connection '%s': destination name => [%s]", name_, connectionDestProcName);
     return destUUID.value();
   }
   // look the processor up by name
-  auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
-  if (NULL != destProcessor) {
+  auto destProcessor = parent_->findProcessorByName(connectionDestProcName, ProcessGroup::Traverse::ExcludeChildren);
+  if (destProcessor) {
     logger_->log_debug("Using 'destination name' to match destination with same name for connection '%s': destination name => [%s]", name_, connectionDestProcName);
     return destProcessor->getUUID();
   }