You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/04/20 07:03:10 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1535 - Support
process groups
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a29b414 MINIFICPP-1535 - Support process groups
a29b414 is described below
commit a29b414b8f7f5310c88a51e9448ec8ce3a83b4c0
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Apr 7 08:35:28 2021 +0200
MINIFICPP-1535 - Support process groups
- Remove obsolete logs
- Move error reporting
- Review changes, restore lost tests
- Rename test accessor
- Change log
- Review change
- Linter fix
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1045
---
.../tests/unit/ProcessGroupTestUtils.h | 290 +++++++++
.../tests/unit/YamlProcessGroupParserTests.cpp | 130 ++++
libminifi/include/core/FlowConfiguration.h | 6 +-
libminifi/include/core/ProcessGroup.h | 38 +-
libminifi/include/core/yaml/YamlConfiguration.h | 33 +-
libminifi/src/core/FlowConfiguration.cpp | 16 +-
libminifi/src/core/ProcessGroup.cpp | 83 ++-
libminifi/src/core/yaml/YamlConfiguration.cpp | 704 +++++++++++----------
libminifi/src/core/yaml/YamlConnectionParser.cpp | 12 +-
9 files changed, 884 insertions(+), 428 deletions(-)
diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
new file mode 100644
index 0000000..e492f87
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <set>
+#include <memory>
+#include <vector>
+
+#include "TestBase.h"
+#include "YamlConfiguration.h"
+#include "Utils.h"
+
+struct Lines {
+ std::vector<std::string> lines;
+
+ std::string join(const std::string& delim) const {
+ return utils::StringUtils::join(delim, lines);
+ }
+
+ Lines& indentAll() & {
+ for (auto& line : lines) {
+ line = " " + std::move(line);
+ }
+ return *this;
+ }
+
+ Lines&& indentAll() && {
+ for (auto& line : lines) {
+ line = " " + std::move(line);
+ }
+ return std::move(*this);
+ }
+
+ Lines& append(Lines more_lines) {
+ std::move(more_lines.lines.begin(), more_lines.lines.end(), std::back_inserter(lines));
+ return *this;
+ }
+
+ Lines& emplace_back(std::string line) {
+ lines.emplace_back(std::move(line));
+ return *this;
+ }
+};
+
+struct Proc {
+ std::string id;
+ std::string name;
+
+ Lines serialize() const {
+ return {{
+ "- id: " + id,
+ " name: " + name,
+ " class: LogAttribute"
+ }};
+ }
+};
+
+struct UnresolvedProc {
+ explicit UnresolvedProc(std::string id): id(std::move(id)) {}
+ std::string id;
+};
+
+struct MaybeProc {
+ MaybeProc(const Proc& proc): id(proc.id), name(proc.name) {} // NOLINT
+ MaybeProc(const UnresolvedProc& proc) : id(proc.id) {} // NOLINT
+
+ std::string id;
+ utils::optional<std::string> name;
+};
+
+struct Conn {
+ std::string name;
+ MaybeProc source;
+ MaybeProc destination;
+
+ Lines serialize() const {
+ return {{
+ "- name: " + name,
+ " source id: " + source.id,
+ " destination id: " + destination.id,
+ " source relationship name: success"
+ }};
+ }
+};
+
+struct RPG {
+ std::string name;
+ std::vector<Proc> input_ports;
+
+ Lines serialize() const {
+ std::vector<std::string> lines;
+ lines.emplace_back("- name: " + name);
+ if (input_ports.empty()) {
+ lines.emplace_back(" Input Ports: []");
+ } else {
+ lines.emplace_back(" Input Ports:");
+ for (const auto& port : input_ports) {
+ lines.emplace_back(" - id: " + port.id);
+ lines.emplace_back(" name: " + port.name);
+ }
+ }
+ return {lines};
+ }
+};
+
+struct Group {
+ explicit Group(std::string name): name_(std::move(name)) {}
+ Group& With(std::vector<Conn> connections) {
+ connections_ = std::move(connections);
+ return *this;
+ }
+ Group& With(std::vector<Proc> processors) {
+ processors_ = std::move(processors);
+ return *this;
+ }
+ Group& With(std::vector<Group> subgroups) {
+ subgroups_ = std::move(subgroups);
+ return *this;
+ }
+ Group& With(std::vector<RPG> rpgs) {
+ rpgs_ = std::move(rpgs);
+ return *this;
+ }
+ Lines serialize(bool is_root = true) const {
+ Lines body;
+ if (processors_.empty()) {
+ body.emplace_back("Processors: []");
+ } else {
+ body.emplace_back("Processors:");
+ for (const auto& proc : processors_) {
+ body.append(proc.serialize().indentAll());
+ }
+ }
+ if (!connections_.empty()) {
+ body.emplace_back("Connections:");
+ for (const auto& conn : connections_) {
+ body.append(conn.serialize().indentAll());
+ }
+ }
+ if (rpgs_.empty()) {
+ body.emplace_back("Remote Process Groups: []");
+ } else {
+ body.emplace_back("Remote Process Groups:");
+ for (const auto& rpg : rpgs_) {
+ body.append(rpg.serialize().indentAll());
+ }
+ }
+ if (!subgroups_.empty()) {
+ body.emplace_back("Process Groups:");
+ for (const auto& subgroup : subgroups_) {
+ body.append(subgroup.serialize(false).indentAll());
+ }
+ }
+ Lines lines;
+ if (is_root) {
+ lines.emplace_back("Flow Controller:");
+ lines.emplace_back(" name: " + name_);
+ lines.append(std::move(body));
+ } else {
+ lines.emplace_back("- name: " + name_);
+ lines.append(std::move(body).indentAll());
+ }
+ return lines;
+ }
+
+ std::string name_;
+ std::vector<Conn> connections_;
+ std::vector<Proc> processors_;
+ std::vector<Group> subgroups_;
+ std::vector<RPG> rpgs_;
+};
+
+struct ProcessGroupTestAccessor {
+ FIELD_ACCESSOR(processors_)
+ FIELD_ACCESSOR(connections_)
+ FIELD_ACCESSOR(child_process_groups_)
+};
+
+template<typename T, typename = void>
+struct Resolve;
+
+template<typename T>
+struct Resolve<T, typename std::enable_if<!std::is_pointer<T>::value>::type> {
+ static auto get(const T& item) -> decltype(item.get()) {
+ return item.get();
+ }
+};
+
+template<typename T>
+struct Resolve<T, typename std::enable_if<std::is_pointer<T>::value>::type> {
+ static auto get(const T& item) -> T {
+ return item;
+ }
+};
+
+template<typename T>
+auto findByName(const std::set<T>& set, const std::string& name) -> decltype(Resolve<T>::get(std::declval<const T&>())) {
+ auto it = std::find_if(set.begin(), set.end(), [&](const T& item) {
+ return item->getName() == name;
+ });
+ if (it != set.end()) {
+ return Resolve<T>::get(*it);
+ }
+ return nullptr;
+}
+
+void verifyProcessGroup(core::ProcessGroup& group, const Group& pattern) {
+ // verify name
+ REQUIRE(group.getName() == pattern.name_);
+ // verify connections
+ std::set<std::shared_ptr<minifi::Connection>>& connections = ProcessGroupTestAccessor::get_connections_(group);
+ REQUIRE(connections.size() == pattern.connections_.size());
+ for (auto& expected : pattern.connections_) {
+ auto conn = findByName(connections, expected.name);
+ REQUIRE(conn);
+ if (!expected.source.name) {
+ REQUIRE(conn->getSource() == nullptr);
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(
+ std::chrono::seconds{1},
+ "Cannot find the source processor with id '" + expected.source.id
+ + "' for the connection [name = '" + expected.name + "'"));
+ } else {
+ REQUIRE(conn->getSource()->getName() == expected.source.name);
+ }
+ if (!expected.destination.name) {
+ REQUIRE(conn->getDestination() == nullptr);
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(
+ std::chrono::seconds{1},
+ "Cannot find the destination processor with id '" + expected.destination.id
+ + "' for the connection [name = '" + expected.name + "'"));
+ } else {
+ REQUIRE(conn->getDestination()->getName() == expected.destination.name);
+ }
+ }
+
+ // verify processors
+ std::set<std::shared_ptr<core::Processor>>& processors = ProcessGroupTestAccessor::get_processors_(group);
+ REQUIRE(processors.size() == pattern.processors_.size());
+ for (auto& expected : pattern.processors_) {
+ REQUIRE(findByName(processors, expected.name));
+ }
+
+ std::set<core::ProcessGroup*> simple_subgroups;
+ std::set<core::ProcessGroup*> rpg_subgroups;
+ for (auto& subgroup : ProcessGroupTestAccessor::get_child_process_groups_(group)) {
+ if (subgroup->isRemoteProcessGroup()) {
+ rpg_subgroups.insert(subgroup.get());
+ } else {
+ simple_subgroups.insert(subgroup.get());
+ }
+ }
+ // verify remote process groups
+ REQUIRE(rpg_subgroups.size() == pattern.rpgs_.size());
+ for (auto& expected : pattern.rpgs_) {
+ auto rpg = findByName(rpg_subgroups, expected.name);
+ REQUIRE(rpg);
+ std::set<std::shared_ptr<core::Processor>>& input_ports = ProcessGroupTestAccessor::get_processors_(*rpg);
+ REQUIRE(input_ports.size() == expected.input_ports.size());
+ for (auto& expected_input_port : expected.input_ports) {
+ auto input_port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(findByName(input_ports, expected_input_port.name));
+ REQUIRE(input_port);
+ REQUIRE(input_port->getName() == expected_input_port.name);
+ }
+ }
+
+ // verify subgroups
+ REQUIRE(simple_subgroups.size() == pattern.subgroups_.size());
+ for (auto& expected : pattern.subgroups_) {
+ auto subgroup = findByName(simple_subgroups, expected.name_);
+ REQUIRE(subgroup);
+ verifyProcessGroup(*subgroup, expected);
+ }
+}
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
new file mode 100644
index 0000000..8e71fa0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "YamlConfiguration.h"
+#include "Utils.h"
+#include "IntegrationTestUtils.h"
+#include "ProcessGroupTestUtils.h"
+
+static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
+
+TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser1]") {
+ auto pattern = Group("root")
+ .With({
+ Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}},
+ Conn{"Conn2",
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc2"},
+ Proc{"00000000-0000-0000-0000-000000000005", "Port1"}}
+ }).With({
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}
+ }).With({
+ RPG{"RPG1", {
+ Proc{"00000000-0000-0000-0000-000000000005", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000006", "Port2"}
+ }},
+ RPG{"RPG2", {}}
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser2]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc2"}})
+ .With({
+ Group("Child1")
+ .With({Conn{"Child1_Conn1",
+ Proc{"00000000-0000-0000-0000-000000000005", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000007", "Child1_Proc2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000006", "Child1_Proc1"},
+ Proc{"00000000-0000-0000-0000-000000000007", "Child1_Proc2"}})
+ .With({RPG{"Child1_RPG1", {
+ Proc{"00000000-0000-0000-0000-000000000005", "Port1"}}}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser3]") {
+ TestController controller;
+ LogTestController::getInstance().setTrace<core::YamlConfiguration>();
+ Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
+ Proc Port1{"00000000-0000-0000-0000-000000000002", "Port1"};
+ Proc Child1_Proc1{"00000000-0000-0000-0000-000000000011", "Child1_Proc1"};
+ Proc Child1_Port1{"00000000-0000-0000-0000-000000000012", "Child1_Port1"};
+ Proc Child2_Proc1{"00000000-0000-0000-0000-000000000021", "Child2_Proc1"};
+ Proc Child2_Port1{"00000000-0000-0000-0000-000000000022", "Child2_Port1"};
+
+ auto pattern = Group("root")
+ .With({Proc1})
+ .With({Conn{"Conn1", Proc1, Port1}})
+ .With({RPG{"RPG1", {Port1}}})
+ .With({
+ Group("Child1").With({Child1_Proc1})
+ .With({Conn{"Child1_Conn1", Child1_Proc1, Child1_Port1}})
+ .With({RPG{"Child1_RPG1", {Child1_Port1}}}),
+ Group("Child2")
+ .With({Child2_Proc1})
+ .With({RPG{"Child2_RPG1", {Child2_Port1}}})
+ });
+
+ auto& Conn1 = pattern.connections_.at(0);
+ auto& Child1_Conn1 = pattern.subgroups_.at(0).connections_.at(0);
+
+ SECTION("Connecting processors in their own groups") {
+ // sanity check, everything is resolved as it should
+ }
+
+ SECTION("Connecting processors in their child/parent group") {
+ Conn1.source = UnresolvedProc{Child1_Proc1.id};
+ Conn1.destination = UnresolvedProc{Child1_Port1.id};
+
+ Child1_Conn1.source = UnresolvedProc{Proc1.id};
+ Child1_Conn1.destination = UnresolvedProc{Port1.id};
+ }
+
+ SECTION("Connecting processors between their own and their child/parent group") {
+ Conn1.source = Proc1;
+ Conn1.destination = UnresolvedProc{Child1_Port1.id};
+
+ Child1_Conn1.source = UnresolvedProc{Port1.id};
+ Child1_Conn1.destination = Child1_Proc1;
+ }
+
+ SECTION("Connecting processors in a sibling group") {
+ Conn1.source = Proc1;
+ Conn1.destination = Port1;
+
+ Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
+ Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+ }
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index efd4ac5..9d96b56 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -105,13 +105,13 @@ class FlowConfiguration : public CoreComponent {
std::shared_ptr<core::Processor> createProcessor(const std::string &name, const std::string &fullname, utils::Identifier &uuid);
// Create Root Processor Group
- std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, utils::Identifier & uuid, int version);
+ std::unique_ptr<core::ProcessGroup> createRootProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
+ std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
+ std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid);
std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name,
const utils::Identifier& uuid);
- // Create Remote Processor Group
- std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, utils::Identifier & uuid);
// Create Connection
std::shared_ptr<minifi::Connection> createConnection(std::string name, const utils::Identifier& uuid) const;
// Create Provenance Report Task
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index fad30a7..78a462a 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -40,6 +40,8 @@
#include "utils/HTTPClient.h"
#include "utils/CallBackTimer.h"
+struct ProcessGroupTestAccessor;
+
namespace org {
namespace apache {
namespace nifi {
@@ -49,6 +51,7 @@ namespace core {
// Process Group Type
enum ProcessGroupType {
ROOT_PROCESS_GROUP = 0,
+ SIMPLE_PROCESS_GROUP,
REMOTE_PROCESS_GROUP,
MAX_PROCESS_GROUP_TYPE
};
@@ -57,7 +60,12 @@ enum ProcessGroupType {
// ProcessGroup Class
class ProcessGroup : public CoreComponent {
+ friend struct ::ProcessGroupTestAccessor;
public:
+ enum class Traverse {
+ ExcludeChildren,
+ IncludeChildren
+ };
// Constructor
/*!
* Create a new process group
@@ -159,6 +167,8 @@ class ProcessGroup : public CoreComponent {
void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter = [] (const std::shared_ptr<Processor>&) {return true;}); // NOLINT
// Whether it is root process group
bool isRootProcessGroup();
+
+ bool isRemoteProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -174,31 +184,31 @@ class ProcessGroup : public CoreComponent {
// Remove processor
void removeProcessor(const std::shared_ptr<Processor>& processor);
// Add child processor group
- void addProcessGroup(ProcessGroup *child);
- // Remove child processor group
- void removeProcessGroup(ProcessGroup *child);
+ void addProcessGroup(std::unique_ptr<ProcessGroup> child);
// ! Add connections
void addConnection(const std::shared_ptr<Connection>& connection);
// Generic find
template <typename Fun>
- std::shared_ptr<Processor> findProcessor(Fun condition) const {
+ std::shared_ptr<Processor> findProcessor(Fun condition, Traverse traverse) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
const auto found = std::find_if(processors_.cbegin(), processors_.cend(), condition);
if (found != processors_.cend()) {
return *found;
}
for (const auto& processGroup : child_process_groups_) {
- const std::shared_ptr<Processor> processor = processGroup->findProcessor(condition);
- if (processor) {
- return processor;
+ if (processGroup->isRemoteProcessGroup() || traverse == Traverse::IncludeChildren) {
+ std::shared_ptr<Processor> processor = processGroup->findProcessor(condition, traverse);
+ if (processor) {
+ return processor;
+ }
}
}
return nullptr;
}
// findProcessor based on UUID
- std::shared_ptr<Processor> findProcessorById(const utils::Identifier& uuid) const;
+ std::shared_ptr<Processor> findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
// findProcessor based on name
- std::shared_ptr<Processor> findProcessorByName(const std::string &processorName) const;
+ std::shared_ptr<Processor> findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;
void getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec);
/**
@@ -236,13 +246,13 @@ class ProcessGroup : public CoreComponent {
// version
int config_version_;
// Process Group Type
- ProcessGroupType type_;
+ const ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
- std::set<std::shared_ptr<Processor> > processors_;
- std::set<std::shared_ptr<Processor> > failed_processors_;
- std::set<ProcessGroup *> child_process_groups_;
+ std::set<std::shared_ptr<Processor>> processors_;
+ std::set<std::shared_ptr<Processor>> failed_processors_;
+ std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
- std::set<std::shared_ptr<Connection> > connections_;
+ std::set<std::shared_ptr<Connection>> connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index aafbeb8..767e7fd 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -83,7 +83,7 @@ class YamlConfiguration : public FlowConfiguration {
}
try {
YAML::Node rootYamlNode = YAML::Load(configuration.value());
- return getYamlRoot(&rootYamlNode);
+ return getYamlRoot(rootYamlNode);
} catch(...) {
logger_->log_error("Invalid yaml configuration file");
throw;
@@ -104,7 +104,7 @@ class YamlConfiguration : public FlowConfiguration {
std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream) {
try {
YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
- return getYamlRoot(&rootYamlNode);
+ return getYamlRoot(rootYamlNode);
} catch (const YAML::ParserException &pe) {
logger_->log_error(pe.what());
std::rethrow_exception(std::current_exception());
@@ -126,7 +126,7 @@ class YamlConfiguration : public FlowConfiguration {
std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override {
try {
YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
- return getYamlRoot(&rootYamlNode);
+ return getYamlRoot(rootYamlNode);
} catch (const YAML::ParserException &pe) {
logger_->log_error(pe.what());
std::rethrow_exception(std::current_exception());
@@ -156,8 +156,11 @@ class YamlConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode);
+ std::unique_ptr<core::ProcessGroup> getYamlRoot(const YAML::Node& rootYamlNode);
+ std::unique_ptr<core::ProcessGroup> createProcessGroup(const YAML::Node& headerNode, bool is_root = false);
+
+ std::unique_ptr<core::ProcessGroup> parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root = false);
/**
* Parses a processor from its corresponding YAML config node and adds
* it to a parent ProcessGroup. The processorNode argument must point
@@ -169,7 +172,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the parent ProcessGroup to which the the created
* Processor should be added
*/
- void parseProcessorNodeYaml(YAML::Node processorNode, core::ProcessGroup *parent);
+ void parseProcessorNodeYaml(const YAML::Node& processorNode, core::ProcessGroup* parent);
/**
* Parses a port from its corressponding YAML config node and adds
@@ -182,7 +185,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the parent ProcessGroup for the port
* @param direction the TransferDirection of the port
*/
- void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction);
+ void parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
/**
* Parses the root level YAML node for the flow configuration and
@@ -192,16 +195,16 @@ class YamlConfiguration : public FlowConfiguration {
* @param rootNode
* @return
*/
- std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(YAML::Node rootNode);
+ std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(const YAML::Node& rootNode);
// Process Property YAML
- void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, std::shared_ptr<core::Processor> processor);
+ void parseProcessorPropertyYaml(const YAML::Node& doc, const YAML::Node& node, std::shared_ptr<core::Processor> processor);
/**
* Parse controller services
* @param controllerServicesNode controller services YAML node.
* @param parent parent process group.
*/
- void parseControllerServices(YAML::Node *controllerServicesNode);
+ void parseControllerServices(const YAML::Node& controllerServicesNode);
/**
* Parses the Connections section of a configuration YAML.
@@ -212,7 +215,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the root node of flow configuration to which
* to add the connections that are parsed
*/
- void parseConnectionYaml(YAML::Node *node, core::ProcessGroup *parent);
+ void parseConnectionYaml(const YAML::Node& node, core::ProcessGroup* parent);
/**
* Parses the Remote Process Group section of a configuration YAML.
@@ -223,7 +226,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent the root node of flow configuration to which
* to add the process groups that are parsed
*/
- void parseRemoteProcessGroupYaml(YAML::Node *node, core::ProcessGroup *parent);
+ void parseRemoteProcessGroupYaml(const YAML::Node& node, core::ProcessGroup* parent);
/**
* Parses the Provenance Reporting section of a configuration YAML.
@@ -235,7 +238,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param parentGroup the root node of flow configuration to which
* to add the provenance reporting config
*/
- void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup);
+ void parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup);
/**
* A helper function to parse the Properties Node YAML for a processor.
@@ -243,7 +246,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param propertiesNode the YAML::Node containing the properties
* @param processor the Processor to which to add the resulting properties
*/
- void parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string &component_name, const std::string &yaml_section);
+ void parsePropertiesNodeYaml(const YAML::Node& propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string& component_name, const std::string& yaml_section);
/**
* A helper function for parsing or generating optional id fields.
@@ -261,7 +264,7 @@ class YamlConfiguration : public FlowConfiguration {
* is optional and defaults to 'id'
* @return the parsed or generated UUID string
*/
- std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = "id");
+ std::string getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField = "id");
/**
* This is a helper function for getting an optional value, if it exists.
@@ -277,7 +280,7 @@ class YamlConfiguration : public FlowConfiguration {
* the optional field is missing. If not provided,
* a default info message will be generated.
*/
- YAML::Node getOptionalField(YAML::Node *yamlNode, const std::string &fieldName, const YAML::Node &defaultValue, const std::string &yamlSection = "", const std::string &infoMessage = "");
+ YAML::Node getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection = "", const std::string& infoMessage = "");
protected:
std::shared_ptr<io::StreamFactory> stream_factory_;
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 232002d..1374205 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -42,7 +42,7 @@ FlowConfiguration::~FlowConfiguration() {
}
}
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier &uuid) {
auto processor = minifi::processors::ProcessorUtils::createProcessor(name, name, uuid, stream_factory_);
if (nullptr == processor) {
logger_->log_error("No Processor defined for %s", name);
@@ -51,7 +51,7 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string
return processor;
}
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(const std::string &name, const std::string &fullname, utils::Identifier & uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(const std::string &name, const std::string &fullname, utils::Identifier &uuid) {
auto processor = minifi::processors::ProcessorUtils::createProcessor(name, fullname, uuid, stream_factory_);
if (nullptr == processor) {
logger_->log_error("No Processor defined for %s", fullname);
@@ -112,12 +112,16 @@ bool FlowConfiguration::persist(const std::string &configuration) {
return filesystem_->write(*config_path_, configuration);
}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, utils::Identifier & uuid, int version) {
- return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(const std::string &name, const utils::Identifier &uuid, int version) {
+ return utils::make_unique<core::ProcessGroup>(core::ROOT_PROCESS_GROUP, name, uuid, version);
}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, utils::Identifier & uuid) {
- return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version) {
+ return utils::make_unique<core::ProcessGroup>(core::SIMPLE_PROCESS_GROUP, name, uuid, version);
+}
+
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid) {
+ return utils::make_unique<core::ProcessGroup>(core::REMOTE_PROCESS_GROUP, name, uuid);
}
std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, const utils::Identifier& uuid) const {
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index add7865..8183d0b 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -87,17 +87,17 @@ ProcessGroup::~ProcessGroup() {
for (auto&& connection : connections_) {
connection->drain(false);
}
-
- for (ProcessGroup* childGroup : child_process_groups_) {
- delete childGroup;
- }
}
bool ProcessGroup::isRootProcessGroup() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
return (type_ == ROOT_PROCESS_GROUP);
}
+bool ProcessGroup::isRemoteProcessGroup() {
+ return (type_ == REMOTE_PROCESS_GROUP);
+}
+
+
void ProcessGroup::addProcessor(const std::shared_ptr<Processor>& processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -118,23 +118,13 @@ void ProcessGroup::removeProcessor(const std::shared_ptr<Processor>& processor)
}
}
-void ProcessGroup::addProcessGroup(ProcessGroup *child) {
+void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
- child_process_groups_.insert(child);
logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_);
- }
-}
-
-void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
-
- if (child_process_groups_.find(child) != child_process_groups_.end()) {
- // We do have the same child process group in this process group yet
- child_process_groups_.erase(child);
- logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_);
+ child_process_groups_.emplace(std::move(child));
}
}
@@ -201,7 +191,7 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg
startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
// Start processing the group
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
@@ -243,7 +233,7 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
}
}
- for (ProcessGroup* childGroup : child_process_groups_) {
+ for (auto& childGroup : child_process_groups_) {
childGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler, filter);
}
} catch (std::exception &exception) {
@@ -255,21 +245,21 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge
}
}
-std::shared_ptr<Processor> ProcessGroup::findProcessorById(const utils::Identifier& uuid) const {
+std::shared_ptr<Processor> ProcessGroup::findProcessorById(const utils::Identifier& uuid, Traverse traverse) const {
const auto id_matches = [&] (const std::shared_ptr<Processor>& processor) {
- logger_->log_debug("Current processor is %s", processor->getName());
+ logger_->log_trace("Searching for processor by id, checking processor %s", processor->getName());
utils::Identifier processorUUID = processor->getUUID();
return processorUUID && uuid == processorUUID;
};
- return findProcessor(id_matches);
+ return findProcessor(id_matches, traverse);
}
-std::shared_ptr<Processor> ProcessGroup::findProcessorByName(const std::string &processorName) const {
+std::shared_ptr<Processor> ProcessGroup::findProcessorByName(const std::string &processorName, Traverse traverse) const {
const auto name_matches = [&] (const std::shared_ptr<Processor>& processor) {
- logger_->log_debug("Current processor is %s", processor->getName());
+ logger_->log_trace("Searching for processor by name, checking processor %s", processor->getName());
return processor->getName() == processorName;
};
- return findProcessor(name_matches);
+ return findProcessor(name_matches, traverse);
}
void ProcessGroup::addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node) {
@@ -287,59 +277,58 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
- std::shared_ptr<Processor> ret = NULL;
- for (auto processor : processors_) {
- logger_->log_debug("Current processor is %s", processor->getName());
+ for (auto& processor : processors_) {
+ logger_->log_trace("Collecting all processors, current processor is %s", processor->getName());
processor_vec.push_back(processor);
}
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->getAllProcessors(processor_vec);
}
}
void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
- for (auto processor : processors_) {
+ for (auto& processor : processors_) {
if (processor->getName() == processorName) {
processor->setProperty(propertyName, propertyValue);
}
}
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
- for (auto connection : connections_) {
+ for (auto& connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) {
- for (auto connection : connections_) {
+ for (auto& connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const {
- for (auto connection : connections_) {
+ for (auto& connection : connections_) {
containers[connection->getUUIDStr()] = connection;
containers[connection->getName()] = connection;
}
- for (auto processor : processors_) {
+ for (auto& processor : processors_) {
// processors can also own FlowFiles
containers[processor->getUUIDStr()] = processor;
}
- for (auto processGroup : child_process_groups_) {
+ for (auto& processGroup : child_process_groups_) {
processGroup->getFlowFileContainers(containers);
}
}
@@ -351,11 +340,19 @@ void ProcessGroup::addConnection(const std::shared_ptr<Connection>& connection)
// We do not have the same connection in this process group yet
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
- std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID());
+ // only allow connections between processors of the same process group
+ std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID(), Traverse::ExcludeChildren);
if (source) {
source->addConnection(connection);
+ } else {
+ logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
+ connection->getSourceUUID().to_string(), connection->getName(), connection->getUUIDStr());
+ }
+ std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID(), Traverse::ExcludeChildren);
+ if (!destination) {
+ logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+ connection->getDestinationUUID().to_string(), connection->getName(), connection->getUUIDStr());
}
- std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID());
if (destination && destination != source) {
destination->addConnection(connection);
}
@@ -383,18 +380,18 @@ void ProcessGroup::drainConnections() {
connection->drain(false);
}
- for (ProcessGroup* childGroup : child_process_groups_) {
+ for (auto& childGroup : child_process_groups_) {
childGroup->drainConnections();
}
}
std::size_t ProcessGroup::getTotalFlowFileCount() const {
std::size_t sum = 0;
- for (auto& conn : connections_) {
+ for (const auto& conn : connections_) {
sum += gsl::narrow<std::size_t>(conn->getQueueSize());
}
- for (const ProcessGroup* childGroup : child_process_groups_) {
+ for (const auto& childGroup : child_process_groups_) {
sum += childGroup->getTotalFlowFileCount();
}
return sum;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 67d094b..38afd40 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -48,36 +48,46 @@ YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& re
stream_factory_(stream_factory),
logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
- utils::Identifier uuid;
- int version = 0;
-
- yaml::checkRequiredField(&rootFlowNode, "name", logger_,
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- std::string flowName = rootFlowNode["name"].as<std::string>();
-
- auto class_loader_functions = rootFlowNode["Class Loader Functions"];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
+ auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+ auto class_loader_functions = flowControllerNode["Class Loader Functions"];
if (class_loader_functions && class_loader_functions.IsSequence()) {
for (auto function : class_loader_functions) {
registerResource(function.as<std::string>());
}
}
- std::string id = getOrGenerateId(&rootFlowNode);
- uuid = id;
+ auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
+ this->name_ = rootGroup->getName();
+ return rootGroup;
+}
- if (rootFlowNode["version"]) {
- version = rootFlowNode["version"].as<int>();
- }
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
+ int version = 0;
- logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
- std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+ yaml::checkRequiredField(&yamlNode, "name", logger_,
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ std::string flowName = yamlNode["name"].as<std::string>();
- this->name_ = flowName;
+ utils::Identifier uuid;
+ // assignment throws on invalid uuid
+ uuid = getOrGenerateId(yamlNode);
- if (rootFlowNode["onschedule retry interval"]) {
+ if (yamlNode["version"]) {
+ version = yamlNode["version"].as<int>();
+ }
+
+ logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
+ std::unique_ptr<core::ProcessGroup> group;
+ if (is_root) {
+ group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
+ } else {
+ group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
+ }
+
+ if (yamlNode["onschedule retry interval"]) {
int64_t onScheduleRetryPeriodValue = -1;
- std::string onScheduleRetryPeriod = rootFlowNode["onschedule retry interval"].as<std::string>();
+ std::string onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
core::TimeUnit unit;
@@ -93,27 +103,43 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml
return group;
}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *rootYamlNode) {
- YAML::Node rootYaml = *rootYamlNode;
- YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
- YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
- YAML::Node controllerServiceNode = rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
- YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-
- if (!remoteProcessingGroupsNode) {
- remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
+ auto group = createProcessGroup(headerNode, is_root);
+ YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
+ YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+ YAML::Node remoteProcessingGroupsNode = [&] {
+ // assignment is not supported on invalid Yaml nodes
+ YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+ if (candidate) {
+ return candidate;
}
+ return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+ }();
+ YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
+
+ parseProcessorNodeYaml(processorsNode, group.get());
+ parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
+ // parse connections last to give feedback if the source and/or destination
+ // is not in the same process group
+ parseConnectionYaml(connectionsNode, group.get());
+
+ if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
+ for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
+ YAML::Node childProcessGroupNode = it->as<YAML::Node>();
+ group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
+ }
+ }
+ return group;
+}
- YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
+ YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+ YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
- parseControllerServices(&controllerServiceNode);
+ parseControllerServices(controllerServiceNode);
// Create the root process group
- std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(flowControllerNode);
- parseProcessorNodeYaml(processorsNode, root.get());
- parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root.get());
- parseConnectionYaml(&connectionsNode, root.get());
- parseProvenanceReportingYaml(&provenanceReportNode, root.get());
+ std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
+ parseProvenanceReportingYaml(provenanceReportNode, root.get());
// set the controller services into the root group.
for (const auto& controller_service : controller_services_->getAllControllerServices()) {
@@ -124,7 +150,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node *r
return root;
}
-void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
int64_t yieldPeriod = -1;
@@ -137,170 +163,169 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
return;
}
- if (processorsNode) {
- if (processorsNode.IsSequence()) {
- // Evaluate sequence of processors
- for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
- core::ProcessorConfig procCfg;
- YAML::Node procNode = iter->as<YAML::Node>();
-
- yaml::checkRequiredField(&procNode, "name", logger_,
- CONFIG_YAML_PROCESSORS_KEY);
- procCfg.name = procNode["name"].as<std::string>();
- procCfg.id = getOrGenerateId(&procNode);
-
- auto lib_location = procNode["Library Location"];
- auto lib_function = procNode["Library Function"];
- if (lib_location && lib_function) {
- auto lib_location_str = lib_location.as<std::string>();
- auto lib_function_str = lib_function.as<std::string>();
- registerResource(lib_location_str, lib_function_str);
- }
-
- uuid = procCfg.id.c_str();
- logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
- yaml::checkRequiredField(&procNode, "class", logger_, CONFIG_YAML_PROCESSORS_KEY);
- procCfg.javaClass = procNode["class"].as<std::string>();
- logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
-
- // Determine the processor name only from the Java class
- auto lastOfIdx = procCfg.javaClass.find_last_of(".");
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- std::string processorName = procCfg.javaClass.substr(lastOfIdx);
- processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
- } else {
- // Allow unqualified class names for core processors
- processor = this->createProcessor(procCfg.javaClass, uuid);
- }
+ if (!processorsNode) {
+ throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ }
+ if (!processorsNode.IsSequence()) {
+ throw std::invalid_argument(
+ "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ }
+ // Evaluate sequence of processors
+ for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+ core::ProcessorConfig procCfg;
+ YAML::Node procNode = iter->as<YAML::Node>();
+
+ yaml::checkRequiredField(&procNode, "name", logger_,
+ CONFIG_YAML_PROCESSORS_KEY);
+ procCfg.name = procNode["name"].as<std::string>();
+ procCfg.id = getOrGenerateId(procNode);
+
+ auto lib_location = procNode["Library Location"];
+ auto lib_function = procNode["Library Function"];
+ if (lib_location && lib_function) {
+ auto lib_location_str = lib_location.as<std::string>();
+ auto lib_function_str = lib_function.as<std::string>();
+ registerResource(lib_location_str, lib_function_str);
+ }
- if (!processor) {
- logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
- throw std::invalid_argument("Could not create processor " + procCfg.name);
- }
+ uuid = procCfg.id.c_str();
+ logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
+ yaml::checkRequiredField(&procNode, "class", logger_, CONFIG_YAML_PROCESSORS_KEY);
+ procCfg.javaClass = procNode["class"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
+
+ // Determine the processor name only from the Java class
+ auto lastOfIdx = procCfg.javaClass.find_last_of(".");
+ if (lastOfIdx != std::string::npos) {
+ lastOfIdx++; // if a value is found, increment to move beyond the .
+ std::string processorName = procCfg.javaClass.substr(lastOfIdx);
+ processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
+ } else {
+ // Allow unqualified class names for core processors
+ processor = this->createProcessor(procCfg.javaClass, uuid);
+ }
- processor->setName(procCfg.name);
+ if (!processor) {
+ logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
+ throw std::invalid_argument("Could not create processor " + procCfg.name);
+ }
- processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
+ processor->setName(procCfg.name);
- auto strategyNode = getOptionalField(&procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
- CONFIG_YAML_PROCESSORS_KEY);
- procCfg.schedulingStrategy = strategyNode.as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
+ processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
- auto periodNode = getOptionalField(&procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
- CONFIG_YAML_PROCESSORS_KEY);
+ auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
+ CONFIG_YAML_PROCESSORS_KEY);
+ procCfg.schedulingStrategy = strategyNode.as<std::string>();
+ logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
- procCfg.schedulingPeriod = periodNode.as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
+ auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
+ CONFIG_YAML_PROCESSORS_KEY);
- if (procNode["max concurrent tasks"]) {
- procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
- logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
- }
+ procCfg.schedulingPeriod = periodNode.as<std::string>();
+ logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
- if (procNode["penalization period"]) {
- procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
- }
+ if (procNode["max concurrent tasks"]) {
+ procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
+ }
- if (procNode["yield period"]) {
- procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
- }
+ if (procNode["penalization period"]) {
+ procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
+ }
- if (procNode["run duration nanos"]) {
- procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
- logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
- }
+ if (procNode["yield period"]) {
+ procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
+ }
- // handle auto-terminated relationships
- if (procNode["auto-terminated relationships list"]) {
- YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
- std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
- std::string autoTerminatedRel = relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
- }
- }
- procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
- }
+ if (procNode["run duration nanos"]) {
+ procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
+ }
- // handle processor properties
- if (procNode["Properties"]) {
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+ // handle auto-terminated relationships
+ if (procNode["auto-terminated relationships list"]) {
+ YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+ std::vector<std::string> rawAutoTerminatedRelationshipValues;
+ if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
+ std::string autoTerminatedRel = relIter->as<std::string>();
+ rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
}
+ }
+ procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
+ }
- // Take care of scheduling
+ // handle processor properties
+ if (procNode["Properties"]) {
+ YAML::Node propertiesNode = procNode["Properties"];
+ parsePropertiesNodeYaml(propertiesNode, processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+ }
- core::TimeUnit unit;
+ // Take care of scheduling
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
- }
- } else {
- processor->setCronPeriod(procCfg.schedulingPeriod);
- }
+ core::TimeUnit unit;
- if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
- processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
- }
+ if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+ if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", schedulingPeriod);
+ processor->setSchedulingPeriodNano(schedulingPeriod);
+ }
+ } else {
+ processor->setCronPeriod(procCfg.schedulingPeriod);
+ }
- if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yieldPeriod);
- processor->setYieldPeriodMsec(yieldPeriod);
- }
+ if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
+ processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
+ }
- // Default to running
- processor->setScheduledState(core::RUNNING);
-
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
- processor->setSchedulingStrategy(core::TIMER_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- processor->setSchedulingStrategy(core::EVENT_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- } else {
- processor->setSchedulingStrategy(core::CRON_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- }
+ if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yieldPeriod);
+ processor->setYieldPeriodMsec(yieldPeriod);
+ }
- int32_t maxConcurrentTasks;
- if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
- }
+ // Default to running
+ processor->setScheduledState(core::RUNNING);
- if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
- logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
- processor->setRunDurationNano((uint64_t) runDurationNanos);
- }
+ if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
+ processor->setSchedulingStrategy(core::TIMER_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+ } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
+ processor->setSchedulingStrategy(core::EVENT_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+ } else {
+ processor->setSchedulingStrategy(core::CRON_DRIVEN);
+ logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
+ }
- std::set<core::Relationship> autoTerminatedRelationships;
- for (auto &&relString : procCfg.autoTerminatedRelationships) {
- core::Relationship relationship(relString, "");
- logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString);
- autoTerminatedRelationships.insert(relationship);
- }
+ int32_t maxConcurrentTasks;
+ if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+ logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
+ }
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+ if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+ logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
+ processor->setRunDurationNano((uint64_t) runDurationNanos);
+ }
- parentGroup->addProcessor(processor);
- }
- } else {
- throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+ std::set<core::Relationship> autoTerminatedRelationships;
+ for (auto &&relString : procCfg.autoTerminatedRelationships) {
+ core::Relationship relationship(relString, "");
+ logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString);
+ autoTerminatedRelationships.insert(relationship);
}
- } else {
- throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
+
+ processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+ parentGroup->addProcessor(processor);
}
}
-void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
utils::Identifier uuid;
std::string id;
@@ -309,125 +334,123 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
return;
}
- if (rpgNode) {
- if (rpgNode->IsSequence()) {
- for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
- YAML::Node currRpgNode = iter->as<YAML::Node>();
+ if (!rpgNode || !rpgNode.IsSequence()) {
+ return;
+ }
+ for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
+ YAML::Node currRpgNode = iter->as<YAML::Node>();
- yaml::checkRequiredField(&currRpgNode, "name", logger_,
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto name = currRpgNode["name"].as<std::string>();
- id = getOrGenerateId(&currRpgNode);
+ yaml::checkRequiredField(&currRpgNode, "name", logger_,
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ auto name = currRpgNode["name"].as<std::string>();
+ id = getOrGenerateId(currRpgNode);
- logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+ logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
- auto urlNode = getOptionalField(&currRpgNode, "url", YAML::Node(""),
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- std::string url = urlNode.as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
+ std::string url = urlNode.as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
- core::ProcessGroup *group = NULL;
- core::TimeUnit unit;
- int64_t timeoutValue = -1;
- int64_t yieldPeriodValue = -1;
- uuid = id;
- group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
- group->setParent(parentGroup);
- parentGroup->addProcessGroup(group);
+ core::TimeUnit unit;
+ int64_t timeoutValue = -1;
+ int64_t yieldPeriodValue = -1;
+ uuid = id;
+ auto group = this->createRemoteProcessGroup(name, uuid);
+ group->setParent(parentGroup);
+
+ if (currRpgNode["yield period"]) {
+ std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+
+ if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yieldPeriodValue);
+ group->setYieldPeriodMsec(yieldPeriodValue);
+ }
+ }
- if (currRpgNode["yield period"]) {
- std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+ if (currRpgNode["timeout"]) {
+ std::string timeout = currRpgNode["timeout"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
- if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yieldPeriodValue);
- group->setYieldPeriodMsec(yieldPeriodValue);
- }
- }
+ if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeoutValue);
+ group->setTimeOut(timeoutValue);
+ }
+ }
- if (currRpgNode["timeout"]) {
- std::string timeout = currRpgNode["timeout"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+ if (currRpgNode["local network interface"]) {
+ std::string interface = currRpgNode["local network interface"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
+ group->setInterface(interface);
+ }
- if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeoutValue);
- group->setTimeOut(timeoutValue);
+ if (currRpgNode["transport protocol"]) {
+ std::string transport_protocol = currRpgNode["transport protocol"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
+ if (transport_protocol == "HTTP") {
+ group->setTransportProtocol(transport_protocol);
+ if (currRpgNode["proxy host"]) {
+ std::string http_proxy_host = currRpgNode["proxy host"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
+ group->setHttpProxyHost(http_proxy_host);
+ if (currRpgNode["proxy user"]) {
+ std::string http_proxy_username = currRpgNode["proxy user"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
+ group->setHttpProxyUserName(http_proxy_username);
}
- }
-
- if (currRpgNode["local network interface"]) {
- std::string interface = currRpgNode["local network interface"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
- group->setInterface(interface);
- }
-
- if (currRpgNode["transport protocol"]) {
- std::string transport_protocol = currRpgNode["transport protocol"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
- if (transport_protocol == "HTTP") {
- group->setTransportProtocol(transport_protocol);
- if (currRpgNode["proxy host"]) {
- std::string http_proxy_host = currRpgNode["proxy host"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
- group->setHttpProxyHost(http_proxy_host);
- if (currRpgNode["proxy user"]) {
- std::string http_proxy_username = currRpgNode["proxy user"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
- group->setHttpProxyUserName(http_proxy_username);
- }
- if (currRpgNode["proxy password"]) {
- std::string http_proxy_password = currRpgNode["proxy password"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
- group->setHttpProxyPassWord(http_proxy_password);
- }
- if (currRpgNode["proxy port"]) {
- std::string http_proxy_port = currRpgNode["proxy port"].as<std::string>();
- int32_t port;
- if (core::Property::StringToInt(http_proxy_port, port)) {
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
- group->setHttpProxyPort(port);
- }
- }
+ if (currRpgNode["proxy password"]) {
+ std::string http_proxy_password = currRpgNode["proxy password"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
+ group->setHttpProxyPassWord(http_proxy_password);
+ }
+ if (currRpgNode["proxy port"]) {
+ std::string http_proxy_port = currRpgNode["proxy port"].as<std::string>();
+ int32_t port;
+ if (core::Property::StringToInt(http_proxy_port, port)) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
+ group->setHttpProxyPort(port);
}
- } else if (transport_protocol == "RAW") {
- group->setTransportProtocol(transport_protocol);
- } else {
- std::stringstream stream;
- stream << "Invalid transport protocol " << transport_protocol;
- throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
}
}
+ } else if (transport_protocol == "RAW") {
+ group->setTransportProtocol(transport_protocol);
+ } else {
+ std::stringstream stream;
+ stream << "Invalid transport protocol " << transport_protocol;
+ throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
+ }
+ }
- group->setTransmitting(true);
- group->setURL(url);
+ group->setTransmitting(true);
+ group->setURL(url);
- yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
- if (inputPorts && inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- YAML::Node currPort = portIter->as<YAML::Node>();
+ yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
+ CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
+ if (inputPorts && inputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+ YAML::Node currPort = portIter->as<YAML::Node>();
- this->parsePortYaml(&currPort, group, sitetosite::SEND);
- } // for node
- }
- YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
- if (outputPorts && outputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
- logger_->log_debug("Got a current port, iterating...");
+ this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
+ } // for node
+ }
+ YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
+ if (outputPorts && outputPorts.IsSequence()) {
+ for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
+ logger_->log_debug("Got a current port, iterating...");
- YAML::Node currPort = portIter->as<YAML::Node>();
+ YAML::Node currPort = portIter->as<YAML::Node>();
- this->parsePortYaml(&currPort, group, sitetosite::RECEIVE);
- } // for node
- }
- }
+ this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
+ } // for node
}
+ parentGroup->addProcessGroup(std::move(group));
}
}
-void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup) {
+void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
utils::Identifier port_uuid;
int64_t schedulingPeriod = -1;
@@ -436,7 +459,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
return;
}
- if (!reportNode || !reportNode->IsDefined() || reportNode->IsNull()) {
+ if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
logger_->log_debug("no provenance reporting task specified");
return;
}
@@ -445,7 +468,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
processor = createProvenanceReportTask();
std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor);
- YAML::Node node = reportNode->as<YAML::Node>();
+ YAML::Node node = reportNode.as<YAML::Node>();
yaml::checkRequiredField(&node, "scheduling strategy", logger_,
CONFIG_YAML_PROVENANCE_REPORT_KEY);
@@ -506,78 +529,77 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
processor->setScheduledState(core::RUNNING);
}
-void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) {
- if (!IsNullOrEmpty(controllerServicesNode)) {
- if (controllerServicesNode->IsSequence()) {
- for (auto iter : *controllerServicesNode) {
- YAML::Node controllerServiceNode = iter.as<YAML::Node>();
- try {
- yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- std::string type = "";
-
- try {
- yaml::checkRequiredField(&controllerServiceNode, "class", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- type = controllerServiceNode["class"].as<std::string>();
- } catch (const std::invalid_argument &) {
- yaml::checkRequiredField(&controllerServiceNode, "type", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- type = controllerServiceNode["type"].as<std::string>();
- logger_->log_debug("Using type %s for controller service node", type);
- }
- std::string fullType = type;
- auto lastOfIdx = type.find_last_of(".");
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- type = type.substr(lastOfIdx);
- }
+void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
+ if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
+ return;
+ }
+ for (const auto& iter : controllerServicesNode) {
+ YAML::Node controllerServiceNode = iter.as<YAML::Node>();
+ try {
+ yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ std::string type = "";
- auto name = controllerServiceNode["name"].as<std::string>();
- auto id = controllerServiceNode["id"].as<std::string>();
-
- utils::Identifier uuid;
- uuid = id;
- auto controller_service_node = createControllerService(type, fullType, name, uuid);
- if (nullptr != controller_service_node) {
- logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
- controller_service_node->initialize();
- YAML::Node propertiesNode = controllerServiceNode["Properties"];
- // we should propogate properties to the node and to the implementation
- parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node), name,
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- if (controller_service_node->getControllerServiceImplementation() != nullptr) {
- parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()), name,
- CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- }
- } else {
- logger_->log_debug("Could not locate %s", type);
- }
- controller_services_->put(id, controller_service_node);
- controller_services_->put(name, controller_service_node);
- } catch (YAML::InvalidNode &) {
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
+ try {
+ yaml::checkRequiredField(&controllerServiceNode, "class", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ type = controllerServiceNode["class"].as<std::string>();
+ } catch (const std::invalid_argument &) {
+ yaml::checkRequiredField(&controllerServiceNode, "type", logger_, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ type = controllerServiceNode["type"].as<std::string>();
+ logger_->log_debug("Using type %s for controller service node", type);
+ }
+ std::string fullType = type;
+ auto lastOfIdx = type.find_last_of(".");
+ if (lastOfIdx != std::string::npos) {
+ lastOfIdx++; // if a value is found, increment to move beyond the .
+ type = type.substr(lastOfIdx);
+ }
+
+ auto name = controllerServiceNode["name"].as<std::string>();
+ auto id = controllerServiceNode["id"].as<std::string>();
+
+ utils::Identifier uuid;
+ uuid = id;
+ auto controller_service_node = createControllerService(type, fullType, name, uuid);
+ if (nullptr != controller_service_node) {
+ logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
+ controller_service_node->initialize();
+ YAML::Node propertiesNode = controllerServiceNode["Properties"];
+ // we should propagate properties to the node and to the implementation
+ parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node), name,
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ if (controller_service_node->getControllerServiceImplementation() != nullptr) {
+ parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation()), name,
+ CONFIG_YAML_CONTROLLER_SERVICES_KEY);
}
+ } else {
+ logger_->log_debug("Could not locate %s", type);
}
+ controller_services_->put(id, controller_service_node);
+ controller_services_->put(name, controller_service_node);
+ } catch (YAML::InvalidNode &) {
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
}
}
}
-void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode, core::ProcessGroup* parent) {
+void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
if (!parent) {
logger_->log_error("parseProcessNode: no parent group was provided");
return;
}
- if (!connectionsNode || !connectionsNode->IsSequence()) {
+ if (!connectionsNode || !connectionsNode.IsSequence()) {
return;
}
- for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+ for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
YAML::Node connectionNode = iter->as<YAML::Node>();
std::shared_ptr<minifi::Connection> connection = nullptr;
// Configure basic connection
- std::string id = getOrGenerateId(&connectionNode);
+ std::string id = getOrGenerateId(connectionNode);
// Default name to be same as ID
// If name is specified in configuration, use the value
@@ -604,7 +626,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode, core::P
}
}
-void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction) {
+void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
utils::Identifier uuid;
std::shared_ptr<core::Processor> processor = NULL;
std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
@@ -614,7 +636,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
return;
}
- YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+ YAML::Node inputPortsObj = portNode.as<YAML::Node>();
// Check for required fields
yaml::checkRequiredField(&inputPortsObj, "name", logger_,
@@ -648,9 +670,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
// else defaults to RAW
// handle port properties
- YAML::Node nodeVal = portNode->as<YAML::Node>();
+ YAML::Node nodeVal = portNode.as<YAML::Node>();
YAML::Node propertiesNode = nodeVal["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor), nameStr,
+ parsePropertiesNodeYaml(propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor), nameStr,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
// add processor to parent
@@ -669,7 +691,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *
}
void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, std::shared_ptr<core::ConfigurableComponent> processor) {
- for (auto iter : propertyValueNode) {
+ for (const auto& iter : propertyValueNode) {
if (iter.IsDefined()) {
YAML::Node nodeVal = iter.as<YAML::Node>();
YAML::Node propertiesNode = nodeVal["value"];
@@ -773,11 +795,11 @@ void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName
}
}
-void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string &component_name,
- const std::string &yaml_section) {
+void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor, const std::string& component_name,
+ const std::string& yaml_section) {
// Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
logger_->log_trace("Entered %s", component_name);
- for (const auto propertyElem : *propertiesNode) {
+ for (const auto& propertyElem : propertiesNode) {
const std::string propertyName = propertyElem.first.as<std::string>();
const YAML::Node propertyValueNode = propertyElem.second;
parsePropertyNodeElement(propertyName, propertyValueNode, processor);
@@ -869,9 +891,9 @@ void YamlConfiguration::raiseComponentError(const std::string &component_name, c
throw std::invalid_argument(err_msg);
}
-std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::string &idField) {
+std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
std::string id;
- YAML::Node node = yamlNode->as<YAML::Node>();
+ YAML::Node node = yamlNode.as<YAML::Node>();
if (node[idField]) {
if (YAML::NodeType::Scalar == node[idField].Type()) {
@@ -887,16 +909,16 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::
return id;
}
-YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode, const std::string &fieldName, const YAML::Node &defaultValue, const std::string &yamlSection,
- const std::string &providedInfoMessage) {
+YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
+ const std::string& providedInfoMessage) {
std::string infoMessage = providedInfoMessage;
- auto result = yamlNode->as<YAML::Node>()[fieldName];
+ auto result = yamlNode.as<YAML::Node>()[fieldName];
if (!result) {
if (infoMessage.empty()) {
// Build a helpful info message for the user to inform them that a default is being used
infoMessage =
- yamlNode->as<YAML::Node>()["name"] ?
- "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "'" :
+ yamlNode.as<YAML::Node>()["name"] ?
+ "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
"Using default value for optional field '" + fieldName + "' ";
if (!yamlSection.empty()) {
infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index cd26117..2d2b265 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -93,14 +93,14 @@ utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
checkRequiredField(&connectionNode_, "source name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
const std::string connectionSrcProcName = connectionNode_["source name"].as<std::string>();
const utils::optional<utils::Identifier> srcUUID = utils::Identifier::parse(connectionSrcProcName);
- if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+ if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the source name is a remote port id, so use that as the source id
logger_->log_debug("Using 'source name' containing a remote port id to match the source for connection '%s': source name => [%s]", name_, connectionSrcProcName);
return srcUUID.value();
}
// lastly, look the processor up by name
- auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
- if (nullptr != srcProcessor) {
+ auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName, ProcessGroup::Traverse::ExcludeChildren);
+ if (srcProcessor) {
logger_->log_debug("Using 'source name' to match source with same name for connection '%s': source name => [%s]", name_, connectionSrcProcName);
return srcProcessor->getUUID();
}
@@ -126,14 +126,14 @@ utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
checkRequiredField(&connectionNode_, "destination name", logger_, CONFIG_YAML_CONNECTIONS_KEY);
std::string connectionDestProcName = connectionNode_["destination name"].as<std::string>();
const utils::optional<utils::Identifier> destUUID = utils::Identifier::parse(connectionDestProcName);
- if (destUUID && parent_->findProcessorById(destUUID.value())) {
+ if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the destination name is a remote port id, so use that as the dest id
logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for connection '%s': destination name => [%s]", name_, connectionDestProcName);
return destUUID.value();
}
// look the processor up by name
- auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
- if (NULL != destProcessor) {
+ auto destProcessor = parent_->findProcessorByName(connectionDestProcName, ProcessGroup::Traverse::ExcludeChildren);
+ if (destProcessor) {
logger_->log_debug("Using 'destination name' to match destination with same name for connection '%s': destination name => [%s]", name_, connectionDestProcName);
return destProcessor->getUUID();
}