You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/09 19:59:50 UTC
nifi-minifi-cpp git commit: MINIFICPP-357 Added support for config
YAML v3.
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 0981f9acf -> c6f9dc628
MINIFICPP-357 Added support for config YAML v3.
This closes #260.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/c6f9dc62
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/c6f9dc62
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/c6f9dc62
Branch: refs/heads/master
Commit: c6f9dc628bd28e5788e2235bfe4b7f7c9000c941
Parents: 0981f9a
Author: Andy I. Christianson <an...@andyic.org>
Authored: Thu Jan 4 13:00:27 2018 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Jan 9 14:58:29 2018 -0500
----------------------------------------------------------------------
libminifi/include/core/yaml/YamlConfiguration.h | 29 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 141 +++++--
libminifi/test/unit/YamlConfigurationTests.cpp | 418 +++++++++++++------
3 files changed, 401 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c6f9dc62/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 7dc58f2..da71ba6 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -40,13 +40,18 @@ namespace core {
#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
#define CONFIG_YAML_CONTROLLER_SERVICES_KEY "Controller Services"
#define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups"
+#define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 "Remote Process Groups"
#define CONFIG_YAML_PROVENANCE_REPORT_KEY "Provenance Reporting"
class YamlConfiguration : public FlowConfiguration {
public:
- explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo,
- std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path = DEFAULT_FLOW_YAML_FILE_NAME)
+ explicit YamlConfiguration(std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,
+ std::shared_ptr<core::ContentRepository> content_repo,
+ std::shared_ptr<io::StreamFactory> stream_factory,
+ std::shared_ptr<Configure> configuration,
+ const std::string path = DEFAULT_FLOW_YAML_FILE_NAME)
: FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration, path),
logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {
stream_factory_ = stream_factory;
@@ -126,11 +131,16 @@ class YamlConfiguration : public FlowConfiguration {
YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY];
YAML::Node controllerServiceNode = rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+
+ if (!remoteProcessingGroupsNode) {
+ remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+ }
+
YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
parseControllerServices(&controllerServiceNode);
// Create the root process group
- core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode);
+ core::ProcessGroup *root = parseRootProcessGroupYaml(flowControllerNode);
parseProcessorNodeYaml(processorsNode, root);
parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root);
parseConnectionYaml(&connectionsNode, root);
@@ -156,7 +166,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the parent ProcessGroup to which the the created
* Processor should be added
*/
- void parseProcessorNodeYaml(YAML::Node processorNode, core::ProcessGroup * parent);
+ void parseProcessorNodeYaml(YAML::Node processorNode, core::ProcessGroup *parent);
/**
* Parses a port from its corressponding YAML config node and adds
@@ -200,7 +210,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the root node of flow configuration to which
* to add the connections that are parsed
*/
- void parseConnectionYaml(YAML::Node *node, core::ProcessGroup * parent);
+ void parseConnectionYaml(YAML::Node *node, core::ProcessGroup *parent);
/**
* Parses the Remote Process Group section of a configuration YAML.
@@ -211,7 +221,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the root node of flow configuration to which
* to add the process groups that are parsed
*/
- void parseRemoteProcessGroupYaml(YAML::Node *node, core::ProcessGroup * parent);
+ void parseRemoteProcessGroupYaml(YAML::Node *node, core::ProcessGroup *parent);
/**
* Parses the Provenance Reporting section of a configuration YAML.
@@ -223,7 +233,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parentGroup the root node of flow configuration to which
* to add the provenance reporting config
*/
- void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup);
+ void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup);
/**
* A helper function to parse the Properties Node YAML for a processor.
@@ -269,7 +279,10 @@ class YamlConfiguration : public FlowConfiguration {
* @throws std::invalid_argument if the required field 'fieldName' is
* not present in 'yamlNode'
*/
- void checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection = "", const std::string &errorMessage = "");
+ void checkRequiredField(YAML::Node *yamlNode,
+ const std::string &fieldName,
+ const std::string &yamlSection = "",
+ const std::string &errorMessage = "");
/**
* This is a helper function for getting an optional value, if it exists.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c6f9dc62/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index cc399fb..399124d 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -38,7 +38,7 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
int32_t version = 0;
checkRequiredField(&rootFlowNode, "name",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string flowName = rootFlowNode["name"].as<std::string>();
auto class_loader_functions = rootFlowNode["Class Loader Functions"];
@@ -66,7 +66,7 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
return group.release();
}
-void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup *parentGroup) {
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
int64_t yieldPeriod = -1;
@@ -87,7 +87,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
YAML::Node procNode = iter->as<YAML::Node>();
checkRequiredField(&procNode, "name",
- CONFIG_YAML_PROCESSORS_KEY);
+ CONFIG_YAML_PROCESSORS_KEY);
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
@@ -158,8 +158,10 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
if (procNode["auto-terminated relationships list"]) {
YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
+ if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
+ && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end();
+ ++relIter) {
std::string autoTerminatedRel = relIter->as<std::string>();
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
}
@@ -175,17 +177,20 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
// Take care of scheduling
core::TimeUnit unit;
- if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+ && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
- if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+ && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%ll] ms", penalizationPeriod);
processor->setPenalizationPeriodMsec(penalizationPeriod);
}
- if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
+ && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%ll] ms", yieldPeriod);
processor->setYieldPeriodMsec(yieldPeriod);
}
@@ -227,7 +232,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
parentGroup->addProcessor(processor);
}
} else {
- throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ throw new std::invalid_argument(
+ "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
} else {
throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined "
@@ -235,7 +241,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
}
}
-void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup *parentGroup) {
uuid_t uuid;
std::string id;
@@ -250,14 +256,14 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
YAML::Node currRpgNode = iter->as<YAML::Node>();
checkRequiredField(&currRpgNode, "name",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto name = currRpgNode["name"].as<std::string>();
id = getOrGenerateId(&currRpgNode);
logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
checkRequiredField(&currRpgNode, "url",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string url = currRpgNode["url"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
@@ -274,7 +280,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
- if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+ if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+ && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%ll] ms", yieldPeriodValue);
group->setYieldPeriodMsec(yieldPeriodValue);
}
@@ -284,7 +291,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
std::string timeout = currRpgNode["timeout"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
- if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+ if (core::Property::StringToTime(timeout, timeoutValue, unit)
+ && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%ll] ms", timeoutValue);
group->setTimeOut(timeoutValue);
}
@@ -294,7 +302,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
group->setURL(url);
checkRequiredField(&currRpgNode, "Input Ports",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
@@ -320,7 +328,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
}
}
-void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup) {
+void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup) {
uuid_t port_uuid;
int64_t schedulingPeriod = -1;
@@ -336,19 +344,21 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
std::shared_ptr<core::Processor> processor = nullptr;
processor = createProvenanceReportTask();
- std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
+ std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask>
+ reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
YAML::Node node = reportNode->as<YAML::Node>();
checkRequiredField(&node, "scheduling strategy",
- CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
checkRequiredField(&node, "scheduling period",
- CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
core::TimeUnit unit;
- if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit)
+ && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
logger_->log_debug("ProvenanceReportingTask schedulingPeriod %ll ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
@@ -424,15 +434,18 @@ void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
controller_service_node->initialize();
YAML::Node propertiesNode = controllerServiceNode["Properties"];
// we should propogate propertiets to the node and to the implementation
- parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node));
+ parsePropertiesNodeYaml(&propertiesNode,
+ std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node));
if (controller_service_node->getControllerServiceImplementation() != nullptr) {
- parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()));
+ parsePropertiesNodeYaml(&propertiesNode,
+ std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()));
}
}
controller_services_->put(id, controller_service_node);
controller_services_->put(name, controller_service_node);
} catch (YAML::InvalidNode &in) {
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
+ throw Exception(ExceptionType::GENERAL_EXCEPTION,
+ "Name, id, and class must be specified for controller services");
}
}
}
@@ -454,7 +467,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
// Configure basic connection
uuid_t uuid;
checkRequiredField(&connectionNode, "name",
- CONFIG_YAML_CONNECTIONS_KEY);
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string name = connectionNode["name"].as<std::string>();
std::string id = getOrGenerateId(&connectionNode);
uuid_parse(id.c_str(), uuid);
@@ -462,13 +475,26 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
logger_->log_debug("Created connection with UUID %s and name %s", id, name);
// Configure connection source
- checkRequiredField(&connectionNode, "source relationship name",
- CONFIG_YAML_CONNECTIONS_KEY);
- auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
- core::Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
- if (connection) {
- connection->setRelationship(relationship);
+ if (connectionNode.as<YAML::Node>()["source relationship name"]) {
+ auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
+ core::Relationship relationship(rawRelationship, "");
+ logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
+ if (connection) {
+ connection->setRelationship(relationship);
+ }
+ } else if (connectionNode.as<YAML::Node>()["source relationship names"]) {
+ auto relList = connectionNode["source relationship names"];
+
+ if (relList.size() != 1) {
+ throw std::invalid_argument("Only one element is supported for 'source relationship names'");
+ }
+
+ auto rawRelationship = relList[0].as<std::string>();
+ core::Relationship relationship(rawRelationship, "");
+ logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
+ if (connection) {
+ connection->setRelationship(relationship);
+ }
}
uuid_t srcUUID;
@@ -488,7 +514,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
connection->setMaxQueueDataSize(max_work_queue_data_size);
}
- logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_data_size << " as the max queue data size for " << name;
+ logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_data_size << " as the max queue data size for "
+ << name;
}
if (connectionNode["source id"]) {
@@ -500,7 +527,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
} else {
// if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
checkRequiredField(&connectionNode, "source name",
- CONFIG_YAML_CONNECTIONS_KEY);
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) {
@@ -519,8 +546,10 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
name, connectionSrcProcName);
} else {
// we ran out of ways to discover the source processor
- logger_->log_error("Could not locate a source with name %s to create a connection", connectionSrcProcName);
- throw std::invalid_argument("Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
+ logger_->log_error("Could not locate a source with name %s to create a connection",
+ connectionSrcProcName);
+ throw std::invalid_argument(
+ "Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
}
}
}
@@ -538,7 +567,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
checkRequiredField(&connectionNode, "destination name",
- CONFIG_YAML_CONNECTIONS_KEY);
+ CONFIG_YAML_CONNECTIONS_KEY);
std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
@@ -558,8 +587,10 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
name, connectionDestProcName);
} else {
// we ran out of ways to discover the destination processor
- logger_->log_error("Could not locate a destination with name %s to create a connection", connectionDestProcName);
- throw std::invalid_argument("Could not locate a destination with name " + connectionDestProcName + " to create a connection");
+ logger_->log_error("Could not locate a destination with name %s to create a connection",
+ connectionDestProcName);
+ throw std::invalid_argument(
+ "Could not locate a destination with name " + connectionDestProcName + " to create a connection");
}
}
}
@@ -573,7 +604,9 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
}
}
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
+ core::ProcessGroup *parent,
+ sitetosite::TransferDirection direction) {
uuid_t uuid;
std::shared_ptr<core::Processor> processor = NULL;
std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
@@ -587,7 +620,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
// Check for required fields
checkRequiredField(&inputPortsObj, "name",
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto nameStr = inputPortsObj["name"].as<std::string>();
checkRequiredField(&inputPortsObj, "id",
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
@@ -599,7 +632,11 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
auto portId = inputPortsObj["id"].as<std::string>();
uuid_parse(portId.c_str(), uuid);
- port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid);
+ port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_,
+ nameStr,
+ parent->getURL(),
+ this->configuration_,
+ uuid);
processor = std::static_pointer_cast<core::Processor>(port);
port->setDirection(direction);
@@ -628,7 +665,8 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
}
}
-void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor) {
+void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
+ std::shared_ptr<core::ConfigurableComponent> processor) {
// Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) {
std::string propertyName = propsIter->first.as<std::string>();
@@ -645,7 +683,10 @@ void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std:
if (!processor->updateProperty(propertyName, rawValueString)) {
std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
if (proc != 0) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s",
+ propertyName,
+ rawValueString,
+ proc->getName());
}
}
}
@@ -655,7 +696,10 @@ void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std:
if (!processor->setProperty(propertyName, rawValueString)) {
std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
if (proc != 0) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName());
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s",
+ propertyName,
+ rawValueString,
+ proc->getName());
}
}
}
@@ -685,7 +729,10 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::
return id;
}
-void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection, const std::string &errorMessage) {
+void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
+ const std::string &fieldName,
+ const std::string &yamlSection,
+ const std::string &errorMessage) {
std::string errMsg = errorMessage;
if (!yamlNode->as<YAML::Node>()[fieldName]) {
if (errMsg.empty()) {
@@ -693,7 +740,9 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::stri
// invalid YAML config file, using the component name if present
errMsg =
yamlNode->as<YAML::Node>()["name"] ?
- "Unable to parse configuration file for component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" + fieldName + "' is missing" :
+ "Unable to parse configuration file for component named '"
+ + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" + fieldName
+ + "' is missing" :
"Unable to parse configuration file as required field '" + fieldName + "' is missing";
if (!yamlSection.empty()) {
errMsg += " [in '" + yamlSection + "' section of configuration file]";
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c6f9dc62/libminifi/test/unit/YamlConfigurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp
index 0020bba..4700ab3 100644
--- a/libminifi/test/unit/YamlConfigurationTests.cpp
+++ b/libminifi/test/unit/YamlConfigurationTests.cpp
@@ -30,113 +30,290 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true);
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true);
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared < minifi::io::StreamFactory > (configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository>
+ content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ core::YamlConfiguration *yamlConfig =
+ new core::YamlConfiguration(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
SECTION("loading YAML without optional component IDs works") {
- static const std::string CONFIG_YAML_WITHOUT_IDS = ""
- "MiNiFi Config Version: 1\n"
- "Flow Controller:\n"
- " name: MiNiFi Flow\n"
- " comment:\n"
- "\n"
- "Core Properties:\n"
- " flow controller graceful shutdown period: 10 sec\n"
- " flow service write delay interval: 500 ms\n"
- " administrative yield duration: 30 sec\n"
- " bored yield duration: 10 millis\n"
- "\n"
- "FlowFile Repository:\n"
- " partitions: 256\n"
- " checkpoint interval: 2 mins\n"
- " always sync: false\n"
- " Swap:\n"
- " threshold: 20000\n"
- " in period: 5 sec\n"
- " in threads: 1\n"
- " out period: 5 sec\n"
- " out threads: 4\n"
- "\n"
- "Provenance Repository:\n"
- " provenance rollover time: 1 min\n"
- "\n"
- "Content Repository:\n"
- " content claim max appendable size: 10 MB\n"
- " content claim max flow files: 100\n"
- " always sync: false\n"
- "\n"
- "Component Status Repository:\n"
- " buffer size: 1440\n"
- " snapshot frequency: 1 min\n"
- "\n"
- "Security Properties:\n"
- " keystore: /tmp/ssl/localhost-ks.jks\n"
- " keystore type: JKS\n"
- " keystore password: localtest\n"
- " key password: localtest\n"
- " truststore: /tmp/ssl/localhost-ts.jks\n"
- " truststore type: JKS\n"
- " truststore password: localtest\n"
- " ssl protocol: TLS\n"
- " Sensitive Props:\n"
- " key:\n"
- " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
- " provider: BC\n"
- "\n"
- "Processors:\n"
- " - name: TailFile\n"
- " class: org.apache.nifi.processors.standard.TailFile\n"
- " max concurrent tasks: 1\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 1 sec\n"
- " penalization period: 30 sec\n"
- " yield period: 1 sec\n"
- " run duration nanos: 0\n"
- " auto-terminated relationships list:\n"
- " Properties:\n"
- " File to Tail: logs/minifi-app.log\n"
- " Rolling Filename Pattern: minifi-app*\n"
- " Initial Start Position: Beginning of File\n"
- "\n"
- "Connections:\n"
- " - name: TailToS2S\n"
- " source name: TailFile\n"
- " source relationship name: success\n"
- " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " max work queue size: 0\n"
- " max work queue data size: 1 MB\n"
- " flowfile expiration: 60 sec\n"
- " queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
- "\n"
- "Remote Processing Groups:\n"
- " - name: NiFi Flow\n"
- " comment:\n"
- " url: https://localhost:8090/nifi\n"
- " timeout: 30 secs\n"
- " yield period: 10 sec\n"
- " Input Ports:\n"
- " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " name: tailed log\n"
- " comments:\n"
- " max concurrent tasks: 1\n"
- " use compression: false\n"
- "\n"
- "Provenance Reporting:\n"
- " comment:\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 30 sec\n"
- " host: localhost\n"
- " port name: provenance\n"
- " port: 8090\n"
- " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
- " url: https://localhost:8090/\n"
- " originating url: http://${hostname(true)}:8081/nifi\n"
- " use compression: true\n"
- " timeout: 30 secs\n"
- " batch size: 1000";
+ static const std::string CONFIG_YAML_WITHOUT_IDS = ""
+ "MiNiFi Config Version: 1\n"
+ "Flow Controller:\n"
+ " name: MiNiFi Flow\n"
+ " comment:\n"
+ "\n"
+ "Core Properties:\n"
+ " flow controller graceful shutdown period: 10 sec\n"
+ " flow service write delay interval: 500 ms\n"
+ " administrative yield duration: 30 sec\n"
+ " bored yield duration: 10 millis\n"
+ "\n"
+ "FlowFile Repository:\n"
+ " partitions: 256\n"
+ " checkpoint interval: 2 mins\n"
+ " always sync: false\n"
+ " Swap:\n"
+ " threshold: 20000\n"
+ " in period: 5 sec\n"
+ " in threads: 1\n"
+ " out period: 5 sec\n"
+ " out threads: 4\n"
+ "\n"
+ "Provenance Repository:\n"
+ " provenance rollover time: 1 min\n"
+ "\n"
+ "Content Repository:\n"
+ " content claim max appendable size: 10 MB\n"
+ " content claim max flow files: 100\n"
+ " always sync: false\n"
+ "\n"
+ "Component Status Repository:\n"
+ " buffer size: 1440\n"
+ " snapshot frequency: 1 min\n"
+ "\n"
+ "Security Properties:\n"
+ " keystore: /tmp/ssl/localhost-ks.jks\n"
+ " keystore type: JKS\n"
+ " keystore password: localtest\n"
+ " key password: localtest\n"
+ " truststore: /tmp/ssl/localhost-ts.jks\n"
+ " truststore type: JKS\n"
+ " truststore password: localtest\n"
+ " ssl protocol: TLS\n"
+ " Sensitive Props:\n"
+ " key:\n"
+ " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
+ " provider: BC\n"
+ "\n"
+ "Processors:\n"
+ " - name: TailFile\n"
+ " class: org.apache.nifi.processors.standard.TailFile\n"
+ " max concurrent tasks: 1\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 1 sec\n"
+ " penalization period: 30 sec\n"
+ " yield period: 1 sec\n"
+ " run duration nanos: 0\n"
+ " auto-terminated relationships list:\n"
+ " Properties:\n"
+ " File to Tail: logs/minifi-app.log\n"
+ " Rolling Filename Pattern: minifi-app*\n"
+ " Initial Start Position: Beginning of File\n"
+ "\n"
+ "Connections:\n"
+ " - name: TailToS2S\n"
+ " source name: TailFile\n"
+ " source relationship name: success\n"
+ " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " max work queue size: 0\n"
+ " max work queue data size: 1 MB\n"
+ " flowfile expiration: 60 sec\n"
+ " queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
+ "\n"
+ "Remote Processing Groups:\n"
+ " - name: NiFi Flow\n"
+ " comment:\n"
+ " url: https://localhost:8090/nifi\n"
+ " timeout: 30 secs\n"
+ " yield period: 10 sec\n"
+ " Input Ports:\n"
+ " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " name: tailed log\n"
+ " comments:\n"
+ " max concurrent tasks: 1\n"
+ " use compression: false\n"
+ "\n"
+ "Provenance Reporting:\n"
+ " comment:\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 30 sec\n"
+ " host: localhost\n"
+ " port name: provenance\n"
+ " port: 8090\n"
+ " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
+ " url: https://localhost:8090/\n"
+ " originating url: http://${hostname(true)}:8081/nifi\n"
+ " use compression: true\n"
+ " timeout: 30 secs\n"
+ " batch size: 1000";
- std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
+ std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
+ std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getYamlRoot(configYamlStream);
+
+ REQUIRE(rootFlowConfig);
+ REQUIRE(rootFlowConfig->findProcessor("TailFile"));
+ REQUIRE(NULL != rootFlowConfig->findProcessor("TailFile")->getUUID());
+ REQUIRE(!rootFlowConfig->findProcessor("TailFile")->getUUIDStr().empty());
+ REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+ REQUIRE(
+ core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessor("TailFile")->getSchedulingStrategy());
+ REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+ REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
+ REQUIRE(30 * 1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
+ REQUIRE(1 * 1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
+ REQUIRE(0 == rootFlowConfig->findProcessor("TailFile")->getRunDurationNano());
+
+ std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+ rootFlowConfig->getConnections(connectionMap);
+ REQUIRE(2 == connectionMap.size());
+ // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it
+ for (auto it : connectionMap) {
+ REQUIRE(it.second);
+ REQUIRE(!it.second->getUUIDStr().empty());
+ REQUIRE(it.second->getDestination());
+ REQUIRE(it.second->getSource());
+ }
+ }
+
+ SECTION("missing required field in YAML throws exception") {
+ static const std::string CONFIG_YAML_NO_RPG_PORT_ID = ""
+ "MiNiFi Config Version: 1\n"
+ "Flow Controller:\n"
+ " name: MiNiFi Flow\n"
+ "Processors: []\n"
+ "Connections: []\n"
+ "Remote Processing Groups:\n"
+ " - name: NiFi Flow\n"
+ " comment:\n"
+ " url: https://localhost:8090/nifi\n"
+ " timeout: 30 secs\n"
+ " yield period: 10 sec\n"
+ " Input Ports:\n"
+ " - name: tailed log\n"
+ " comments:\n"
+ " max concurrent tasks: 1\n"
+ " use compression: false\n"
+ "\n";
+
+ std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID);
+ REQUIRE_THROWS_AS(yamlConfig->getYamlRoot(configYamlStream), std::invalid_argument);
+ }
+}
+
+TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
+ TestController test_controller;
+
+ std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true);
+ std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true);
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository>
+ content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ core::YamlConfiguration *yamlConfig =
+ new core::YamlConfiguration(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+
+ static const std::string TEST_CONFIG_YAML = R"(
+MiNiFi Config Version: 3
+Flow Controller:
+ name: Simple TailFile To RPG
+ comment: ''
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+ max concurrent threads: 1
+ variable registry properties: ''
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+Provenance Repository:
+ provenance rollover time: 1 min
+ implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+Security Properties:
+ keystore: ''
+ keystore type: ''
+ keystore password: ''
+ key password: ''
+ truststore: ''
+ truststore type: ''
+ truststore password: ''
+ ssl protocol: ''
+ Sensitive Props:
+ key:
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+Processors:
+- id: b0c04f28-0158-1000-0000-000000000000
+ name: TailFile
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ File Location: Local
+ File to Tail: ./logs/minifi-app.log
+ Initial Start Position: Beginning of File
+ Rolling Filename Pattern:
+ tail-base-directory:
+ tail-mode: Single file
+ tailfile-lookup-frequency: 10 minutes
+ tailfile-maximum-age: 24 hours
+ tailfile-recursive-lookup: 'false'
+ tailfile-rolling-strategy: Fixed name
+Controller Services: []
+Process Groups: []
+Input Ports: []
+Output Ports: []
+Funnels: []
+Connections:
+- id: b0c0c3cc-0158-1000-0000-000000000000
+ name: TailFile/success/ac0e798c-0158-1000-0588-cda9b944e011
+ source id: b0c04f28-0158-1000-0000-000000000000
+ source relationship names:
+ - success
+ destination id: ac0e798c-0158-1000-0588-cda9b944e011
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+Remote Process Groups:
+- id: b0c09ff0-0158-1000-0000-000000000000
+ name: ''
+ url: http://localhost:8080/nifi
+ comment: ''
+ timeout: 30 sec
+ yield period: 10 sec
+ transport protocol: RAW
+ proxy host: ''
+ proxy port: ''
+ proxy user: ''
+ proxy password: ''
+ local network interface: ''
+ Input Ports:
+ - id: aca664f8-0158-1000-a139-92485891d349
+ name: test2
+ comment: ''
+ max concurrent tasks: 1
+ use compression: false
+ - id: ac0e798c-0158-1000-0588-cda9b944e011
+ name: test
+ comment: ''
+ max concurrent tasks: 1
+ use compression: false
+ Output Ports: []
+NiFi Properties Overrides: {}
+ )";
+ std::istringstream configYamlStream(TEST_CONFIG_YAML);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
@@ -146,15 +323,15 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessor("TailFile")->getSchedulingStrategy());
REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
- REQUIRE(1*1000*1000*1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
- REQUIRE(30*1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
- REQUIRE(1*1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
+ REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
+ REQUIRE(30 * 1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
+ REQUIRE(1 * 1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
REQUIRE(0 == rootFlowConfig->findProcessor("TailFile")->getRunDurationNano());
std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
rootFlowConfig->getConnections(connectionMap);
REQUIRE(2 == connectionMap.size());
- // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it
+
for (auto it : connectionMap) {
REQUIRE(it.second);
REQUIRE(!it.second->getUUIDStr().empty());
@@ -162,28 +339,3 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
REQUIRE(it.second->getSource());
}
}
-
- SECTION("missing required field in YAML throws exception") {
- static const std::string CONFIG_YAML_NO_RPG_PORT_ID = ""
- "MiNiFi Config Version: 1\n"
- "Flow Controller:\n"
- " name: MiNiFi Flow\n"
- "Processors: []\n"
- "Connections: []\n"
- "Remote Processing Groups:\n"
- " - name: NiFi Flow\n"
- " comment:\n"
- " url: https://localhost:8090/nifi\n"
- " timeout: 30 secs\n"
- " yield period: 10 sec\n"
- " Input Ports:\n"
- " - name: tailed log\n"
- " comments:\n"
- " max concurrent tasks: 1\n"
- " use compression: false\n"
- "\n";
-
- std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID);
- REQUIRE_THROWS_AS(yamlConfig->getYamlRoot(configYamlStream), std::invalid_argument);
-}
-}