You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/22 17:01:30 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #948: MINIFICPP-1325 - Refactor and test YAML connection parsing

fgerlits commented on a change in pull request #948:
URL: https://github.com/apache/nifi-minifi-cpp/pull/948#discussion_r580265526



##########
File path: extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
##########
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml.", "[YamlConfiguration]") {

Review comment:
       minor: there should be no `.` at the end of TEST_CASE names

##########
File path: extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
##########
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml.", "[YamlConfiguration]") {
+  const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
+  core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+  gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+  SECTION("Source relationships are read") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    std::string serialized_yaml;
+    std::vector<std::string> expectations;
+    SECTION("Single relationship name") {
+      serialized_yaml = std::string { "source relationship name: success\n" };
+      expectations = { "success" };
+    }
+    SECTION("List of relationship names") {
+      serialized_yaml = std::string {
+          "source relationship names:\n"
+          "- success\n"
+          "- failure\n"
+          "- something_else\n" };
+      expectations = { "success", "failure", "something_else" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+    const std::set<core::Relationship>& relationships = connection->getRelationships();
+    REQUIRE(expectations.size() == relationships.size());
+    for (const auto& expected_relationship_name : expectations) {
+      const auto relationship_name_matches = [&] (const core::Relationship& relationship) { return expected_relationship_name == relationship.getName(); };
+      const std::size_t relationship_count = std::count_if(relationships.cbegin(), relationships.cend(), relationship_name_matches);
+      REQUIRE(1 == relationship_count);
+    }

Review comment:
       If you change the type of `expectations` to `set<Relationship>`, then you can simply write:
   ```suggestion
       REQUIRE(expectations == relationships);
   ```

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);

Review comment:
       In both `getWorkQueueSizeFromYaml` and `getWorkQueueDataSizeFromYaml`, we used to log that the size is 0 if `StringToInt` failed; now we only log if the conversion is successful.

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);
+    }
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {
+      logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+      return srcUUID.value();
+    }
+  }
+  // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+  checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+  const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
+  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+    // the source name is a remote port id, so use that as the source id
+    logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcUUID.value();
+  }
+  // lastly, look the processor up by name
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+  if (nullptr != srcProcessor) {
+    logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcProcessor->getUUID();
+  }
+  // we ran out of ways to discover the source processor
+  const std::string error_msg = "Could not locate a source with name " + connectionSrcProcName + " to create a connection ";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+  const YAML::Node destination_id_node = connectionNode_["destination id"];
+  if (destination_id_node) {
+    const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+    if (destUUID) {

Review comment:
       here, too, we're silently continuing instead of throwing an exception if `destUUID` is nullopt

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);

Review comment:
       should be `PRIu64` instead of `u` (at line 73, too)

##########
File path: extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
##########
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml.", "[YamlConfiguration]") {
+  const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
+  core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+  gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+  SECTION("Source relationships are read") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    std::string serialized_yaml;
+    std::vector<std::string> expectations;
+    SECTION("Single relationship name") {
+      serialized_yaml = std::string { "source relationship name: success\n" };
+      expectations = { "success" };
+    }
+    SECTION("List of relationship names") {
+      serialized_yaml = std::string {
+          "source relationship names:\n"
+          "- success\n"
+          "- failure\n"
+          "- something_else\n" };
+      expectations = { "success", "failure", "something_else" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+    const std::set<core::Relationship>& relationships = connection->getRelationships();
+    REQUIRE(expectations.size() == relationships.size());
+    for (const auto& expected_relationship_name : expectations) {
+      const auto relationship_name_matches = [&] (const core::Relationship& relationship) { return expected_relationship_name == relationship.getName(); };
+      const std::size_t relationship_count = std::count_if(relationships.cbegin(), relationships.cend(), relationship_name_matches);
+      REQUIRE(1 == relationship_count);
+    }
+  }
+  SECTION("Queue size limits are read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "max work queue size: 231\n"
+        "max work queue data size: 12 MB\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
+    REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
+  }
+  SECTION("Source and destination names and uuids are read") {
+    const utils::Identifier expected_source_id = utils::generateUUID();
+    const utils::Identifier expected_destination_id = utils::generateUUID();
+    std::string serialized_yaml;
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_1", expected_source_id)));
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_2", expected_destination_id)));
+    SECTION("Directly from configuration") {
+      serialized_yaml = std::string {
+          "source id: " + expected_source_id.to_string() + "\n"
+          "destination id: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Using UUID as remote port id") {

Review comment:
       remote port id?

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);
+    }
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {

Review comment:
       Here, too, we used to throw if the parsing failed; now we silently continue.

##########
File path: libminifi/src/core/yaml/YamlConfiguration.cpp
##########
@@ -80,6 +82,37 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
   return group.release();
 }
 
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *rootYamlNode) {
+    YAML::Node rootYaml = *rootYamlNode;
+    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
+    YAML::Node connectionsNode = rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+    YAML::Node controllerServiceNode = rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+    YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+
+    if (!remoteProcessingGroupsNode) {
+      remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+    }
+
+    YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+
+    parseControllerServices(&controllerServiceNode);
+    // Create the root process group
+    core::ProcessGroup *root = parseRootProcessGroupYaml(flowControllerNode);

Review comment:
       Again, old code, but...
   `root` could be wrapped in a `unique_ptr` here [or, even better, `parseRootProcessGroupYaml()` could return a `unique_ptr`], so that `root` is guaranteed to be deleted, even if an exception is thrown somewhere in lines 102-111.

##########
File path: libminifi/src/core/yaml/YamlConfiguration.cpp
##########
@@ -80,6 +82,37 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
   return group.release();
 }
 
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *rootYamlNode) {
+    YAML::Node rootYaml = *rootYamlNode;
+    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
+    YAML::Node connectionsNode = rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+    YAML::Node controllerServiceNode = rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+    YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+
+    if (!remoteProcessingGroupsNode) {
+      remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+    }
+
+    YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+
+    parseControllerServices(&controllerServiceNode);
+    // Create the root process group
+    core::ProcessGroup *root = parseRootProcessGroupYaml(flowControllerNode);
+    parseProcessorNodeYaml(processorsNode, root);
+    parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root);
+    parseConnectionYaml(&connectionsNode, root);
+    parseProvenanceReportingYaml(&provenanceReportNode, root);
+
+    // set the controller services into the root group.
+    for (auto controller_service : controller_services_->getAllControllerServices()) {

Review comment:
       I know this code is not new, it was only moved, but this should be `const auto&`.

##########
File path: libminifi/src/core/yaml/YamlConfiguration.cpp
##########
@@ -526,172 +526,155 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
   }
+  if (!connectionsNode || !connectionsNode->IsSequence()) {
+    return;
+  }
 
-  if (connectionsNode) {
-    if (connectionsNode->IsSequence()) {
-      for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
-        YAML::Node connectionNode = iter->as<YAML::Node>();
-        std::shared_ptr<minifi::Connection> connection = nullptr;
+  for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+    YAML::Node connectionNode = iter->as<YAML::Node>();
+    std::shared_ptr<minifi::Connection> connection = nullptr;
 
-        // Configure basic connection
-        utils::Identifier uuid;
-        std::string id = getOrGenerateId(&connectionNode);
+    // Configure basic connection
+    std::string id = getOrGenerateId(&connectionNode);
 
-        // Default name to be same as ID
-        std::string name = id;
+    // Default name to be same as ID
+    // If name is specified in configuration, use the value
+    std::string name = connectionNode["name"].as<std::string>(id);
 
-        // If name is specified in configuration, use the value
-        if (connectionNode["name"]) {
-          name = connectionNode["name"].as<std::string>();
-        }
+    const utils::optional<utils::Identifier> uuid = utils::Identifier::parse(id);
+    if (!uuid) {
+      logger_->log_debug("Incorrect connection UUID format.");
+      return;
+    }
 
-        uuid = id;
-        connection = this->createConnection(name, uuid);
-        logger_->log_debug("Created connection with UUID %s and name %s", id, name);
-
-        // Configure connection source
-        if (connectionNode.as<YAML::Node>()["source relationship name"]) {
-          auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
-          core::Relationship relationship(rawRelationship, "");
-          logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
-          if (connection) {
-            connection->addRelationship(relationship);
-          }
-        } else if (connectionNode.as<YAML::Node>()["source relationship names"]) {
-          auto relList = connectionNode["source relationship names"];
-          if (connection) {
-            if (relList.IsSequence()) {
-              for (const auto &rel : relList) {
-                auto rawRelationship = rel.as<std::string>();
-                core::Relationship relationship(rawRelationship, "");
-                logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
-                connection->addRelationship(relationship);
-              }
-            } else {
-              auto rawRelationship = relList.as<std::string>();
-              core::Relationship relationship(rawRelationship, "");
-              logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
-              connection->addRelationship(relationship);
-            }
+    connection = createConnection(name, uuid.value());
+    if (!connection) {
+      return;

Review comment:
       This can't happen, right?  I would put an assertion here.

##########
File path: libminifi/src/core/yaml/YamlConfiguration.cpp
##########
@@ -526,172 +526,155 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
   }
+  if (!connectionsNode || !connectionsNode->IsSequence()) {
+    return;
+  }
 
-  if (connectionsNode) {
-    if (connectionsNode->IsSequence()) {
-      for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
-        YAML::Node connectionNode = iter->as<YAML::Node>();
-        std::shared_ptr<minifi::Connection> connection = nullptr;
+  for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+    YAML::Node connectionNode = iter->as<YAML::Node>();
+    std::shared_ptr<minifi::Connection> connection = nullptr;
 
-        // Configure basic connection
-        utils::Identifier uuid;
-        std::string id = getOrGenerateId(&connectionNode);
+    // Configure basic connection
+    std::string id = getOrGenerateId(&connectionNode);
 
-        // Default name to be same as ID
-        std::string name = id;
+    // Default name to be same as ID
+    // If name is specified in configuration, use the value
+    std::string name = connectionNode["name"].as<std::string>(id);
 
-        // If name is specified in configuration, use the value
-        if (connectionNode["name"]) {
-          name = connectionNode["name"].as<std::string>();
-        }
+    const utils::optional<utils::Identifier> uuid = utils::Identifier::parse(id);
+    if (!uuid) {
+      logger_->log_debug("Incorrect connection UUID format.");

Review comment:
       We used to throw an exception in this case (in `Identifier::operator=(const std::string&)`, now we log and return.  Is this change intentional?

##########
File path: libminifi/include/core/yaml/YamlConnectionParser.h
##########
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "core/ProcessGroup.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "yaml-cpp/yaml.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+class YamlConnectionParser {
+ public:
+  static constexpr const char* CONFIG_YAML_CONNECTIONS_KEY{ "Connections" };
+
+  explicit YamlConnectionParser(const YAML::Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+      connectionNode_(connectionNode),
+      name_(name),
+      parent_(parent),
+      logger_(logger) {}
+
+  void configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const;
+  uint64_t getWorkQueueSizeFromYaml() const;
+  uint64_t getWorkQueueDataSizeFromYaml() const;
+  utils::Identifier getSourceUUIDFromYaml() const;
+  utils::Identifier getDestinationUUIDFromYaml() const;
+  uint64_t getFlowFileExpirationFromYaml() const;
+  bool getDropEmptyFromYaml() const;
+ private:
+  const YAML::Node& connectionNode_;
+  const std::string& name_;
+  gsl::not_null<core::ProcessGroup*> parent_;
+  const std::shared_ptr<logging::Logger>& logger_;

Review comment:
       This looks weird, storing a reference to a smart pointer.  Storing `Logger&` would be shorter and easier to read; storing `shared_ptr<Logger>` (without the `&`) would be safer.

##########
File path: libminifi/include/core/yaml/YamlConfiguration.h
##########
@@ -41,7 +43,6 @@ namespace core {
 #define DEFAULT_FLOW_YAML_FILE_NAME "conf/config.yml"
 #define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"

Review comment:
       Some of these constants are also defined in MainHelper.h.  Can those duplicate definitions be removed?

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);
+    }
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {
+      logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+      return srcUUID.value();
+    }
+  }
+  // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+  checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+  const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
+  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+    // the source name is a remote port id, so use that as the source id
+    logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcUUID.value();
+  }
+  // lastly, look the processor up by name
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+  if (nullptr != srcProcessor) {
+    logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcProcessor->getUUID();
+  }
+  // we ran out of ways to discover the source processor
+  const std::string error_msg = "Could not locate a source with name " + connectionSrcProcName + " to create a connection ";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+  const YAML::Node destination_id_node = connectionNode_["destination id"];
+  if (destination_id_node) {
+    const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+    if (destUUID) {
+      logger_->log_debug("Using 'destination id' to match destination with same id for connection '%s': destination id => [%s]", name_, destUUID.value().to_string());
+      return destUUID.value();
+    }
+  }
+  // we use the same logic as above for resolving the source processor
+  // for looking up the destination processor in absence of a processor id
+  checkRequiredField(&connectionNode_, "destination name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  std::string connectionDestProcName = connectionNode_["destination name"].as<std::string>();
+  const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(connectionDestProcName);
+  if (destUUID && parent_->findProcessorById(destUUID.value())) {
+    // the destination name is a remote port id, so use that as the dest id
+    logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destUUID.value();
+  }
+  // look the processor up by name
+  auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
+  if (NULL != destProcessor) {
+    logger_->log_debug("Using 'destination name' to match destination with same name for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destProcessor->getUUID();
+  }
+  // we ran out of ways to discover the destination processor
+  const std::string error_msg = "Could not locate a destination with name " + connectionDestProcName + " to create a connection";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+  const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+  if (!expiration_node) {
+    return 0;
+  }
+  uint64_t expirationDuration = 0;
+  TimeUnit unit;
+  const std::string flowfile_expiration_str = expiration_node.as<std::string>();
+  if (!core::Property::StringToTime(flowfile_expiration_str, expirationDuration, unit) || !core::Property::ConvertTimeUnitToMS(expirationDuration, unit, expirationDuration)) {
+    // We should throw here, but we do not.
+    // The reason is that our parser only accepts time formats that consists of a number and
+    // a unit, but users might use this field populated with a "0" (and no units).
+    // We cannot correct this, because there is no API contract for the config, we need to support
+    // all already-supported configuration files.
+    // This has the side-effect of allowing values like "20 minuites" and silently defaulting to 0.
+    logger_->log_debug("parseConnection: flowfile expiration => [%d]", expirationDuration);
+  }
+  return expirationDuration;
+}
+
+bool YamlConnectionParser::getDropEmptyFromYaml() const {
+  const YAML::Node drop_empty_node = connectionNode_["drop empty"];
+  if (drop_empty_node) {
+    bool dropEmpty = false;
+    return utils::StringUtils::StringToBool(drop_empty_node.as<std::string>(), dropEmpty) && dropEmpty;
+  }
+  return false;

Review comment:
       Here, too, we used to not call `connection->setDropEmptyFlowFiles()`, and now we call it with `false`.  I assume that is OK?

##########
File path: extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
##########
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml.", "[YamlConfiguration]") {
+  const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
+  core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+  gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+  SECTION("Source relationships are read") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    std::string serialized_yaml;
+    std::vector<std::string> expectations;
+    SECTION("Single relationship name") {
+      serialized_yaml = std::string { "source relationship name: success\n" };
+      expectations = { "success" };
+    }
+    SECTION("List of relationship names") {
+      serialized_yaml = std::string {
+          "source relationship names:\n"
+          "- success\n"
+          "- failure\n"
+          "- something_else\n" };
+      expectations = { "success", "failure", "something_else" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+    const std::set<core::Relationship>& relationships = connection->getRelationships();
+    REQUIRE(expectations.size() == relationships.size());
+    for (const auto& expected_relationship_name : expectations) {
+      const auto relationship_name_matches = [&] (const core::Relationship& relationship) { return expected_relationship_name == relationship.getName(); };
+      const std::size_t relationship_count = std::count_if(relationships.cbegin(), relationships.cend(), relationship_name_matches);
+      REQUIRE(1 == relationship_count);
+    }
+  }
+  SECTION("Queue size limits are read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "max work queue size: 231\n"
+        "max work queue data size: 12 MB\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
+    REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
+  }
+  SECTION("Source and destination names and uuids are read") {
+    const utils::Identifier expected_source_id = utils::generateUUID();
+    const utils::Identifier expected_destination_id = utils::generateUUID();
+    std::string serialized_yaml;
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_1", expected_source_id)));
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_2", expected_destination_id)));
+    SECTION("Directly from configuration") {
+      serialized_yaml = std::string {
+          "source id: " + expected_source_id.to_string() + "\n"
+          "destination id: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Using UUID as remote port id") {
+      serialized_yaml = std::string {
+          "source name: " + expected_source_id.to_string() + "\n"
+          "destination name: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Via processor name lookup") {
+      serialized_yaml = std::string {
+          "source name: TailFile_1\n"
+          "destination name: TailFile_2\n" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUIDFromYaml());
+    REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUIDFromYaml());
+  }
+  SECTION("Flow file expiration is read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "flowfile expiration: 2 min\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(120000 == yaml_connection_parser.getFlowFileExpirationFromYaml());  // 2 * 60 * 1000 ms
+  }
+  SECTION("Drop empty value is read") {
+    SECTION("When config contains true value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: true\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("When config contains false value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: false\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+  }
+  SECTION("Errors are handled properly when configuration lines are missing") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    SECTION("With empty configuration") {
+      YAML::Node connection_node = YAML::Load(std::string(""));
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("With a configuration that lists keys but has no assigned values") {
+      std::string serialized_yaml;
+      SECTION("Single relationship name left empty") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        // This seems incorrect, but we do not want to ruin backward compatibility
+        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("List of relationship names contains empty item") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source relationship names:\n"
+            "- \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("Source and destination lookup from via id") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source id: \n"
+            "destination id: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      SECTION("Source and destination lookup via name") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      YAML::Node connection_node = YAML::Load(std::string {
+          "max work queue size: \n"
+          "max work queue data size: \n"
+          "flowfile expiration: \n"
+          "drop empty: \n"});
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK_THROWS(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());

Review comment:
       This part should be wrapped in a SECTION, too.  As it stands, these lines will be executed five times (once for each section).

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);

Review comment:
       we should probably log this before returning :)

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);
+    }
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {
+      logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+      return srcUUID.value();
+    }
+  }
+  // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+  checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+  const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
+  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+    // the source name is a remote port id, so use that as the source id
+    logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcUUID.value();
+  }
+  // lastly, look the processor up by name
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+  if (nullptr != srcProcessor) {
+    logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcProcessor->getUUID();
+  }
+  // we ran out of ways to discover the source processor
+  const std::string error_msg = "Could not locate a source with name " + connectionSrcProcName + " to create a connection ";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+  const YAML::Node destination_id_node = connectionNode_["destination id"];
+  if (destination_id_node) {
+    const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+    if (destUUID) {
+      logger_->log_debug("Using 'destination id' to match destination with same id for connection '%s': destination id => [%s]", name_, destUUID.value().to_string());
+      return destUUID.value();
+    }
+  }
+  // we use the same logic as above for resolving the source processor
+  // for looking up the destination processor in absence of a processor id
+  checkRequiredField(&connectionNode_, "destination name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  std::string connectionDestProcName = connectionNode_["destination name"].as<std::string>();
+  const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(connectionDestProcName);
+  if (destUUID && parent_->findProcessorById(destUUID.value())) {
+    // the destination name is a remote port id, so use that as the dest id
+    logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destUUID.value();
+  }
+  // look the processor up by name
+  auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
+  if (NULL != destProcessor) {
+    logger_->log_debug("Using 'destination name' to match destination with same name for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destProcessor->getUUID();
+  }
+  // we ran out of ways to discover the destination processor
+  const std::string error_msg = "Could not locate a destination with name " + connectionDestProcName + " to create a connection";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+  const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+  if (!expiration_node) {
+    return 0;

Review comment:
       In this case, when `expiration_node` was null, we used to not call `connection->setFlowExpirationDuration()`; now we call it with a 0 value.  Is that OK?

##########
File path: libminifi/src/core/yaml/YamlConnectionParser.cpp
##########
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %u as the max queue size.", max_work_queue_size);
+      return max_work_queue_size;
+    }
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
+      return max_work_queue_data_size;
+      logger_->log_debug("Setting %u as the max as the max queue data size.", max_work_queue_data_size);
+    }
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {
+      logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+      return srcUUID.value();
+    }
+  }
+  // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+  checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+  const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
+  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+    // the source name is a remote port id, so use that as the source id
+    logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcUUID.value();
+  }
+  // lastly, look the processor up by name
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+  if (nullptr != srcProcessor) {
+    logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcProcessor->getUUID();
+  }
+  // we ran out of ways to discover the source processor
+  const std::string error_msg = "Could not locate a source with name " + connectionSrcProcName + " to create a connection ";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+  const YAML::Node destination_id_node = connectionNode_["destination id"];
+  if (destination_id_node) {
+    const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+    if (destUUID) {
+      logger_->log_debug("Using 'destination id' to match destination with same id for connection '%s': destination id => [%s]", name_, destUUID.value().to_string());
+      return destUUID.value();
+    }
+  }
+  // we use the same logic as above for resolving the source processor
+  // for looking up the destination processor in absence of a processor id
+  checkRequiredField(&connectionNode_, "destination name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
+  std::string connectionDestProcName = connectionNode_["destination name"].as<std::string>();
+  const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(connectionDestProcName);
+  if (destUUID && parent_->findProcessorById(destUUID.value())) {
+    // the destination name is a remote port id, so use that as the dest id
+    logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destUUID.value();
+  }
+  // look the processor up by name
+  auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
+  if (NULL != destProcessor) {
+    logger_->log_debug("Using 'destination name' to match destination with same name for connection '%s': destination name => [%s]", name_, connectionDestProcName);
+    return destProcessor->getUUID();
+  }
+  // we ran out of ways to discover the destination processor
+  const std::string error_msg = "Could not locate a destination with name " + connectionDestProcName + " to create a connection";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+  const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+  if (!expiration_node) {
+    return 0;
+  }
+  uint64_t expirationDuration = 0;
+  TimeUnit unit;
+  const std::string flowfile_expiration_str = expiration_node.as<std::string>();
+  if (!core::Property::StringToTime(flowfile_expiration_str, expirationDuration, unit) || !core::Property::ConvertTimeUnitToMS(expirationDuration, unit, expirationDuration)) {
+    // We should throw here, but we do not.
+    // The reason is that our parser only accepts time formats that consists of a number and
+    // a unit, but users might use this field populated with a "0" (and no units).
+    // We cannot correct this, because there is no API contract for the config, we need to support
+    // all already-supported configuration files.
+    // This has the side-effect of allowing values like "20 minuites" and silently defaulting to 0.
+    logger_->log_debug("parseConnection: flowfile expiration => [%d]", expirationDuration);

Review comment:
       We should log this when the parsing succeeds, too.  (Before, it was only logged in the success branch, but it's probably a good idea to log it in both cases.)

##########
File path: extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
##########
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml.", "[YamlConfiguration]") {
+  const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
+  core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+  gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+  SECTION("Source relationships are read") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    std::string serialized_yaml;
+    std::vector<std::string> expectations;
+    SECTION("Single relationship name") {
+      serialized_yaml = std::string { "source relationship name: success\n" };
+      expectations = { "success" };
+    }
+    SECTION("List of relationship names") {
+      serialized_yaml = std::string {
+          "source relationship names:\n"
+          "- success\n"
+          "- failure\n"
+          "- something_else\n" };
+      expectations = { "success", "failure", "something_else" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+    const std::set<core::Relationship>& relationships = connection->getRelationships();
+    REQUIRE(expectations.size() == relationships.size());
+    for (const auto& expected_relationship_name : expectations) {
+      const auto relationship_name_matches = [&] (const core::Relationship& relationship) { return expected_relationship_name == relationship.getName(); };
+      const std::size_t relationship_count = std::count_if(relationships.cbegin(), relationships.cend(), relationship_name_matches);
+      REQUIRE(1 == relationship_count);
+    }
+  }
+  SECTION("Queue size limits are read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "max work queue size: 231\n"
+        "max work queue data size: 12 MB\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
+    REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
+  }
+  SECTION("Source and destination names and uuids are read") {
+    const utils::Identifier expected_source_id = utils::generateUUID();
+    const utils::Identifier expected_destination_id = utils::generateUUID();
+    std::string serialized_yaml;
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_1", expected_source_id)));
+    parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_2", expected_destination_id)));
+    SECTION("Directly from configuration") {
+      serialized_yaml = std::string {
+          "source id: " + expected_source_id.to_string() + "\n"
+          "destination id: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Using UUID as remote port id") {
+      serialized_yaml = std::string {
+          "source name: " + expected_source_id.to_string() + "\n"
+          "destination name: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Via processor name lookup") {
+      serialized_yaml = std::string {
+          "source name: TailFile_1\n"
+          "destination name: TailFile_2\n" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUIDFromYaml());
+    REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUIDFromYaml());
+  }
+  SECTION("Flow file expiration is read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "flowfile expiration: 2 min\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+    REQUIRE(120000 == yaml_connection_parser.getFlowFileExpirationFromYaml());  // 2 * 60 * 1000 ms
+  }
+  SECTION("Drop empty value is read") {
+    SECTION("When config contains true value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: true\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("When config contains false value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: false\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+  }
+  SECTION("Errors are handled properly when configuration lines are missing") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
+    SECTION("With empty configuration") {
+      YAML::Node connection_node = YAML::Load(std::string(""));
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("With a configuration that lists keys but has no assigned values") {
+      std::string serialized_yaml;
+      SECTION("Single relationship name left empty") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        // This seems incorrect, but we do not want to ruin backward compatibility
+        CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("List of relationship names contains empty item") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source relationship names:\n"
+            "- \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("Source and destination lookup from via id") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source id: \n"
+            "destination id: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      SECTION("Source and destination lookup via name") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      YAML::Node connection_node = YAML::Load(std::string {
+          "max work queue size: \n"
+          "max work queue data size: \n"
+          "flowfile expiration: \n"
+          "drop empty: \n"});
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK_THROWS(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("With a configuration that has values of incorrect format") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "max work queue size: 2 KB\n"
+          "max work queue data size: 10 Incorrect\n"
+          "flowfile expiration: 12\n"
+          "drop empty: sup\n"});
+      YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());

Review comment:
       A comment would be useful here to explain why these should not throw (something similar to line 138, I assume).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org