You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/03/25 12:58:24 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash
This is an automated email from the ASF dual-hosted git repository.
martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1c0e86a MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash
1c0e86a is described below
commit 1c0e86a40a67e32241ae776339dbf3f861607568
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Fri Mar 25 13:08:28 2022 +0100
MINIFICPP-1776 - Parallel heartbeat and flow update can cause crash
Closes #1284
Signed-off-by: Martin Zink <ma...@apache.org>
---
.../tests/C2ClearCoreComponentStateTest.cpp | 13 ++--
.../tests/C2DescribeCoreComponentStateTest.cpp | 13 ++--
extensions/http-curl/tests/VerifyInvokeHTTP.h | 19 +++---
.../tests/integration/TailFileTest.cpp | 6 +-
libminifi/include/FlowController.h | 17 +++--
libminifi/include/c2/C2Client.h | 1 +
libminifi/include/core/state/UpdateController.h | 15 ++---
.../include/core/state/nodes/AgentInformation.h | 12 ++--
.../include/core/state/nodes/FlowInformation.h | 22 ++++---
libminifi/src/FlowController.cpp | 74 +++++++++++++---------
libminifi/src/c2/C2Agent.cpp | 31 +++++----
libminifi/src/c2/C2Client.cpp | 21 +++++-
libminifi/src/c2/ControllerSocketProtocol.cpp | 28 ++++----
.../src/core/state/nodes/SupportedOperations.cpp | 6 +-
.../integration/OnScheduleErrorHandlingTests.cpp | 24 +++++--
.../integration/StateTransactionalityTests.cpp | 29 ++++++---
libminifi/test/pcap-tests/PcapTest.cpp | 7 +-
libminifi/test/unit/ControllerTests.cpp | 11 ++--
18 files changed, 208 insertions(+), 141 deletions(-)
diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index 37719e8..b85f5da 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -51,10 +51,15 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
protected:
void updateProperties(minifi::FlowController& flow_controller) override {
- dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile1")[0])
- ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
- dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile2")[0])
- ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+ auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
+ auto* processor = dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+ processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+ };
+
+ flow_controller.executeOnComponent("TailFile1",
+ [&](minifi::state::StateController& component) {setFileName(test_file_1_, component);});
+ flow_controller.executeOnComponent("TailFile2",
+ [&](minifi::state::StateController& component) {setFileName(test_file_2_, component);});
}
TestController testController;
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
index 77a3ffe..cf2f619 100644
--- a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -43,10 +43,15 @@ class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
protected:
void updateProperties(minifi::FlowController& flow_controller) override {
- dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile1")[0])
- ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
- dynamic_cast<minifi::state::ProcessorController*>(flow_controller.getComponents("TailFile2")[0])
- ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+ auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
+ auto* processor = dynamic_cast<minifi::state::ProcessorController&>(component).getProcessor();
+ processor->setProperty(minifi::processors::TailFile::FileName, fileName);
+ };
+
+ flow_controller.executeOnComponent("TailFile1",
+ [&](minifi::state::StateController& component) {setFileName(test_file_1_, component);});
+ flow_controller.executeOnComponent("TailFile2",
+ [&](minifi::state::StateController& component) {setFileName(test_file_2_, component);});
}
TestController testController;
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index c4552f9..92687ea 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -64,15 +64,16 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
}
void setProperty(const std::string& property, const std::string& value) {
- const auto components = flowController_->getComponents("InvokeHTTP");
- assert(!components.empty());
-
- const auto stateController = components[0];
- assert(stateController);
- const auto processorController = dynamic_cast<minifi::state::ProcessorController*>(stateController);
- assert(processorController);
- auto proc = processorController->getProcessor();
- proc->setProperty(property, value);
+ bool executed = false;
+ flowController_->executeOnComponent("InvokeHTTP", [&](minifi::state::StateController& component) {
+ const auto processorController = dynamic_cast<minifi::state::ProcessorController*>(&component);
+ assert(processorController);
+ auto proc = processorController->getProcessor();
+ proc->setProperty(property, value);
+ executed = true;
+ });
+
+ assert(executed);
}
virtual void setupFlow(const std::optional<std::string>& flow_yml_path) {
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index d99e117..42e1a7f 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -71,13 +71,13 @@ class TailFileTestHarness : public IntegrationBase {
protected:
void updateProperties(minifi::FlowController& fc) override {
- for (auto comp : fc.getComponents("tf")) {
- auto proc = dynamic_cast<minifi::state::ProcessorController*>(comp);
+ fc.executeOnComponent("tf", [this] (minifi::state::StateController& component) {
+ auto proc = dynamic_cast<minifi::state::ProcessorController*>(&component);
if (nullptr != proc) {
proc->getProcessor()->setProperty(minifi::processors::TailFile::FileName, ss.str());
proc->getProcessor()->setProperty(minifi::processors::TailFile::StateFile, statefile);
}
- }
+ });
}
std::string statefile;
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 2a3b10c..0d5f6d7 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -114,9 +114,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
return -1;
}
- std::vector<state::StateController*> getComponents(const std::string &name) override;
-
- std::vector<state::StateController*> getAllComponents() override;
+ void executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) override;
+ void executeOnAllComponents(std::function<void(state::StateController&)> func) override;
int16_t clearConnection(const std::string &connection) override;
@@ -234,11 +233,15 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
std::chrono::steady_clock::time_point start_time_;
private:
- void getProcessorController(const std::string& name, std::vector<state::StateController*>& controllerVec,
- const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
+ std::vector<state::StateController*> getAllComponents();
+
+ state::StateController* getComponent(const std::string &name);
+
+ state::StateController* getProcessorController(const std::string& name,
+ const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
- void getAllProcessorControllers(std::vector<state::StateController*>& controllerVec,
- const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
+ std::vector<state::StateController*> getAllProcessorControllers(
+ const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory);
std::unique_ptr<state::ProcessorController> createController(core::Processor& processor);
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 068ec30..14d4dcd 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -61,6 +61,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter {
protected:
bool isC2Enabled() const;
std::optional<std::string> fetchFlow(const std::string& uri) const;
+ void updateResponseNodeConnections();
private:
void initializeComponentMetrics();
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 8364140..a4c9ad5 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -139,18 +139,13 @@ class StateController : public Pausable {
*/
class StateMonitor : public StateController {
public:
- virtual ~StateMonitor() = default;
+ ~StateMonitor() override = default;
- std::atomic<bool> &isStateMonitorRunning() {
- return controller_running_;
- }
-
- virtual std::vector<StateController*> getComponents(const std::string &name) = 0;
+ // Execute callback func on the named component. Thread safe, locking mutex_, preventing concurrent flow update
+ virtual void executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) = 0;
- virtual std::vector<StateController*> getAllComponents() = 0;
- /**
- * Operational controllers
- */
+ // Execute callback func on the all components. Thread safe, locking mutex_, preventing concurrent flow update
+ virtual void executeOnAllComponents(std::function<void(state::StateController&)> func) = 0;
/**
* Drain repositories
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 49c3129..904ada1 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -502,24 +502,22 @@ class AgentStatus : public StateMonitorNode {
SerializedResponseNode components_node(false);
components_node.name = "components";
if (monitor_ != nullptr) {
- auto components = monitor_->getAllComponents();
-
- for (const auto& component : components) {
+ monitor_->executeOnAllComponents([&components_node](StateController& component){
SerializedResponseNode component_node(false);
- component_node.name = component->getComponentName();
+ component_node.name = component.getComponentName();
SerializedResponseNode uuid_node;
uuid_node.name = "uuid";
- uuid_node.value = std::string{component->getComponentUUID().to_string()};
+ uuid_node.value = std::string{component.getComponentUUID().to_string()};
SerializedResponseNode component_status_node;
component_status_node.name = "running";
- component_status_node.value = component->isRunning();
+ component_status_node.value = component.isRunning();
component_node.children.push_back(component_status_node);
component_node.children.push_back(uuid_node);
components_node.children.push_back(component_node);
- }
+ });
}
return components_node;
}
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index 13bc5f8..b10e504 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -141,15 +141,20 @@ class FlowMonitor : public StateMonitorNode {
: StateMonitorNode(name) {
}
- void addConnection(minifi::Connection* connection) {
+ void updateConnection(minifi::Connection* connection) {
if (nullptr != connection) {
- connections_.insert(std::make_pair(connection->getUUIDStr(), connection));
+ connections_[connection->getUUIDStr()] = connection;
}
}
+ void clearConnections() {
+ connections_.clear();
+ }
+
void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
- flow_version_ = flow_version;
+ flow_version_ = std::move(flow_version);
}
+
protected:
std::shared_ptr<state::response::FlowVersion> flow_version_;
std::map<std::string, minifi::Connection*> connections_;
@@ -228,26 +233,25 @@ class FlowInformation : public FlowMonitor {
}
if (nullptr != monitor_) {
- auto components = monitor_->getAllComponents();
SerializedResponseNode componentsNode(false);
componentsNode.name = "components";
- for (auto component : components) {
+ monitor_->executeOnAllComponents([&componentsNode](StateController& component){
SerializedResponseNode componentNode(false);
- componentNode.name = component->getComponentName();
+ componentNode.name = component.getComponentName();
SerializedResponseNode uuidNode;
uuidNode.name = "uuid";
- uuidNode.value = std::string{component->getComponentUUID().to_string()};
+ uuidNode.value = std::string{component.getComponentUUID().to_string()};
SerializedResponseNode componentStatusNode;
componentStatusNode.name = "running";
- componentStatusNode.value = component->isRunning();
+ componentStatusNode.value = component.isRunning();
componentNode.children.push_back(componentStatusNode);
componentNode.children.push_back(uuidNode);
componentsNode.children.push_back(componentNode);
- }
+ });
serialized.push_back(componentsNode);
}
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 516ff8f..f518572 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -17,7 +17,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <time.h>
#include <vector>
#include <map>
#include <chrono>
@@ -29,18 +28,12 @@
#include "FlowController.h"
#include "core/state/nodes/AgentInformation.h"
-#include "core/state/nodes/BuildInformation.h"
-#include "core/state/nodes/DeviceInformation.h"
#include "core/state/nodes/FlowInformation.h"
-#include "core/state/nodes/ProcessMetrics.h"
-#include "core/state/nodes/QueueMetrics.h"
#include "core/state/nodes/RepositoryMetrics.h"
-#include "core/state/nodes/SystemMetrics.h"
#include "core/state/ProcessorController.h"
#include "c2/C2Agent.h"
#include "core/ProcessGroup.h"
#include "core/Core.h"
-#include "core/ClassLoader.h"
#include "SchedulingAgent.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ForwardingControllerServiceProvider.h"
@@ -50,7 +43,6 @@
#include "utils/file/FileSystem.h"
#include "utils/HTTPClient.h"
#include "io/NetworkPrioritizer.h"
-#include "io/validation.h"
#include "io/FileStream.h"
namespace org {
@@ -134,6 +126,8 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
controller_map_->clear();
auto prevRoot = std::move(this->root_);
this->root_ = std::move(newRoot);
+ processor_to_controller_.clear();
+ updateResponseNodeConnections();
initialized_ = false;
bool started = false;
try {
@@ -191,7 +185,7 @@ int16_t FlowController::stop() {
thread_pool_.shutdown();
/* STOP! Before you change it, consider the following:
* -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
- * -They only guarantee that the processors are not scheduled any more
+ * -They only guarantee that the processors are not scheduled anymore
* -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */
if (this->root_) {
this->root_->drainConnections();
@@ -284,6 +278,8 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
if (root) {
logger_->log_info("Load Flow Controller from provided root");
this->root_ = std::move(root);
+ processor_to_controller_.clear();
+ updateResponseNodeConnections();
} else {
logger_->log_info("Instantiating new flow");
this->root_ = loadInitialFlow();
@@ -441,30 +437,44 @@ std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest(
return agentInfo;
}
+void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ for (auto* component: getAllComponents()) {
+ func(*component);
+ }
+}
+
+void FlowController::executeOnComponent(const std::string &name, std::function<void(state::StateController&)> func) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ if (auto* component = getComponent(name); component != nullptr) {
+ func(*component);
+ } else {
+ logger_->log_error("Could not get execute requested callback for component \"%s\", because component was not found", name);
+ }
+}
+
std::vector<state::StateController*> FlowController::getAllComponents() {
- std::vector<state::StateController*> vec{this};
if (root_) {
auto controllerFactory = [this] (core::Processor& p) {
return createController(p);
};
- getAllProcessorControllers(vec, controllerFactory);
+ return getAllProcessorControllers(controllerFactory);
}
- return vec;
-}
-std::vector<state::StateController*> FlowController::getComponents(const std::string& name) {
- std::vector<state::StateController*> vec;
+ return {this};
+}
+state::StateController* FlowController::getComponent(const std::string& name) {
if (name == "FlowController") {
- vec.push_back(this);
+ return this;
} else if (root_) {
auto controllerFactory = [this] (core::Processor& p) {
return createController(p);
};
- getProcessorController(name, vec, controllerFactory);
+ return getProcessorController(name, controllerFactory);
}
- return vec;
+ return nullptr;
}
std::unique_ptr<state::ProcessorController> FlowController::createController(core::Processor& processor) {
@@ -509,31 +519,37 @@ std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebug
return debug_info;
}
-void FlowController::getAllProcessorControllers(std::vector<state::StateController*>& controllerVec,
- const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
-
+std::vector<state::StateController*> FlowController::getAllProcessorControllers(
+ const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
+ std::vector<state::StateController*> controllerVec{this};
std::vector<core::Processor*> processorVec;
root_->getAllProcessors(processorVec);
for (const auto& processor : processorVec) {
- // find controller for processor, if it doesn't exist, create one
+ // reference to the existing or newly created controller
auto& controller = processor_to_controller_[processor->getUUID()];
if (!controller) {
controller = controllerFactory(*processor);
}
controllerVec.push_back(controller.get());
}
+
+ return controllerVec;
}
-void FlowController::getProcessorController(const std::string& name, std::vector<state::StateController*>& controllerVec,
- const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
+state::StateController* FlowController::getProcessorController(const std::string& name, const std::function<std::unique_ptr<state::ProcessorController>(core::Processor&)>& controllerFactory) {
auto* processor = root_->findProcessorByName(name);
- auto& controller = processor_to_controller_[processor->getUUID()];
- if (!controller) {
- controller = controllerFactory(*processor);
+ if (processor == nullptr) {
+ logger_->log_error("Could not get processor controller for requested name \"%s\", because processor was not found either", name);
+ return nullptr;
+ }
+
+ // reference to the existing or newly created controller
+ auto& foundController = processor_to_controller_[processor->getUUID()];
+ if (!foundController) {
+ foundController = controllerFactory(*processor);
}
- controllerVec.push_back(controller.get());
+ return foundController.get();
}
} // namespace minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index c39bebc..b7cd0e4 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -358,15 +358,15 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
raise(SIGTERM);
}
- const auto components = update_sink_->getComponents(resp.name);
// stop all referenced components.
- for (auto &component : components) {
- logger_->log_debug("Stopping component %s", component->getComponentName());
- if (resp.op == Operation::STOP)
- component->stop();
- else
- component->start();
- }
+ update_sink_->executeOnComponent(resp.name, [this, &resp] (state::StateController& component) {
+ logger_->log_debug("Stopping component %s", component.getComponentName());
+ if (resp.op == Operation::STOP) {
+ component.stop();
+ } else {
+ component.start();
+ }
+ });
if (resp.ident.length() > 0) {
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
@@ -452,21 +452,20 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
}
case ClearOperand::CORECOMPONENTSTATE: {
for (const auto& corecomponent : resp.operation_arguments) {
- std::vector<state::StateController*> components = update_sink_->getComponents(corecomponent.second.to_string());
auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
if (state_manager_provider != nullptr) {
- for (auto* component : components) {
- logger_->log_debug("Clearing state for component %s", component->getComponentName());
- auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
+ update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_manager_provider] (state::StateController& component) {
+ logger_->log_debug("Clearing state for component %s", component.getComponentName());
+ auto state_manager = state_manager_provider->getCoreComponentStateManager(component.getComponentUUID());
if (state_manager != nullptr) {
- component->stop();
+ component.stop();
state_manager->clear();
state_manager->persist();
- component->start();
+ component.start();
} else {
- logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID().to_string());
+ logger_->log_warn("Failed to get StateManager for component %s", component.getComponentUUID().to_string());
}
- }
+ });
} else {
logger_->log_error("Failed to get StateManagerProvider");
}
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index a4d51fc..3a195ef 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -122,7 +122,7 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle
auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
if (flowMonitor != nullptr) {
for (auto &con : connections) {
- flowMonitor->addConnection(con.second);
+ flowMonitor->updateConnection(con.second);
}
flowMonitor->setStateMonitor(update_sink);
flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
@@ -325,6 +325,7 @@ std::vector<std::shared_ptr<state::response::ResponseNode>> C2Client::getHeartbe
const bool include = include_manifest || fullHb == "true";
std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+ nodes.reserve(root_response_nodes_.size());
std::lock_guard<std::mutex> lock(metrics_mutex_);
for (const auto &entry : root_response_nodes_) {
auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(entry.second);
@@ -336,6 +337,24 @@ std::vector<std::shared_ptr<state::response::ResponseNode>> C2Client::getHeartbe
return nodes;
}
+void C2Client::updateResponseNodeConnections() {
+ std::map<std::string, Connection*> connections;
+ if (root_ != nullptr) {
+ root_->getConnections(connections);
+ }
+
+ std::lock_guard<std::mutex> lock(metrics_mutex_);
+ for (auto& [_, responseNode] : root_response_nodes_) {
+ auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(responseNode.get());
+ if (flowMonitor != nullptr) {
+ flowMonitor->clearConnections();
+ for (const auto &con: connections) {
+ flowMonitor->updateConnection(con.second);
+ }
+ }
+ }
+}
+
} // namespace c2
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 0afa8e0..4949893 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -97,10 +97,9 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
std::string componentStr;
const auto size = stream->read(componentStr);
if (!io::isError(size)) {
- auto components = update_sink_->getComponents(componentStr);
- for (const auto& component : components) {
- component->start();
- }
+ update_sink_->executeOnComponent(componentStr, [](state::StateController& component) {
+ component.start();
+ });
} else {
logger_->log_debug("Connection broke");
}
@@ -111,10 +110,9 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
std::string componentStr;
const auto size = stream->read(componentStr);
if (!io::isError(size)) {
- auto components = update_sink_->getComponents(componentStr);
- for (const auto& component : components) {
- component->stop();
- }
+ update_sink_->executeOnComponent(componentStr, [](state::StateController& component) {
+ component.stop();
+ });
} else {
logger_->log_debug("Connection broke");
}
@@ -180,14 +178,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
resp.write(response.str());
stream->write(resp.getBuffer());
} else if (what == "components") {
+ std::vector<std::pair<std::string, bool>> components;
+ update_sink_->executeOnAllComponents([&components](state::StateController& component){
+ components.emplace_back(component.getComponentName(), component.isRunning());
+ });
io::BufferStream resp;
resp.write(&head, 1);
- const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
- resp.write(size_);
- for (const auto &component : update_sink_->getAllComponents()) {
- resp.write(component->getComponentName());
- resp.write(component->isRunning() ? "true" : "false");
+ resp.write(gsl::narrow<uint16_t>(components.size()));
+ for (const auto& [name, isRunning] : components) {
+ resp.write(name);
+ resp.write(isRunning ? "true" : "false");
}
+
stream->write(resp.getBuffer());
} else if (what == "jstack") {
io::BufferStream resp;
diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp
index 8c893c7..e7ff1f6 100644
--- a/libminifi/src/core/state/nodes/SupportedOperations.cpp
+++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp
@@ -107,9 +107,9 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min
case minifi::c2::Operation::STOP: {
addProperty(properties, "c2");
if (monitor_) {
- for (const auto& component: monitor_->getAllComponents()) {
- addProperty(properties, component->getComponentName());
- }
+ monitor_->executeOnAllComponents([&properties](StateController& component){
+ addProperty(properties, component.getComponentName());
+ });
}
break;
}
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 575cf71..3b79fce 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -76,21 +76,31 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
class EventDriverScheduleErrorHandlingTests: public IntegrationBase {
public:
void updateProperties(minifi::FlowController& fc) override {
- auto controller_vec = fc.getAllComponents();
/* This tests depends on a configuration that contains only one KamikazeProcessor named kamikaze
* (See testOnScheduleRetry.yml)
* In this case there are two components in the flowcontroller: first is the controller itself,
* second is the processor that the test uses.
* Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
* that most probably means a breaking change. */
- assert(controller_vec.size() == 2);
- assert(controller_vec[0]->getComponentName() == "FlowController");
- assert(controller_vec[1]->getComponentName() == "kamikaze");
+ size_t controllerVecIdx = 0;
- auto process_controller = dynamic_cast<org::apache::nifi::minifi::state::ProcessorController*>(controller_vec[1]);
- assert(process_controller != nullptr);
+ fc.executeOnAllComponents([&controllerVecIdx](org::apache::nifi::minifi::state::StateController& component){
+ if (controllerVecIdx == 0) {
+ assert(component.getComponentName() == "FlowController");
+ } else if (controllerVecIdx == 1) {
+ assert(component.getComponentName() == "kamikaze");
- process_controller->getProcessor()->setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
+ auto process_controller = dynamic_cast<org::apache::nifi::minifi::state::ProcessorController*>(&component);
+ assert(process_controller != nullptr);
+
+ process_controller->getProcessor()->setSchedulingStrategy(org::apache::nifi::minifi::core::SchedulingStrategy::EVENT_DRIVEN);
+ }
+
+ ++controllerVecIdx;
+ });
+
+ // check controller vector size
+ assert(controllerVecIdx == 2);
}
void runAssertions() override {
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp b/libminifi/test/integration/StateTransactionalityTests.cpp
index 66671dd..18bba7c 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -56,23 +56,32 @@ class StatefulIntegrationTest : public IntegrationBase {
}
void updateProperties(minifi::FlowController& fc) override {
- const auto controllerVec = fc.getAllComponents();
/* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
* (See TestStateTransactionality.yml)
* In this case there are two components in the flowcontroller: first is the controller itself,
* second is the processor that the test uses.
* Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
* that most probably means a breaking change. */
- assert(controllerVec.size() == 2);
- assert(controllerVec[0]->getComponentName() == "FlowController");
- assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+ size_t controllerVecIdx = 0;
- // set hooks
- const auto processController = dynamic_cast<ProcessorController*>(controllerVec[1]);
- assert(processController != nullptr);
- stateful_processor_ = dynamic_cast<StatefulProcessor*>(processController->getProcessor());
- assert(stateful_processor_ != nullptr);
- stateful_processor_->setHooks(on_schedule_hook_, on_trigger_hooks_);
+ fc.executeOnAllComponents([this, &controllerVecIdx](org::apache::nifi::minifi::state::StateController& component){
+ if (controllerVecIdx == 0) {
+ assert(component.getComponentName() == "FlowController");
+ } else if (controllerVecIdx == 1) {
+ assert(component.getComponentName() == "statefulProcessor");
+ // set hooks
+ const auto processController = dynamic_cast<ProcessorController*>(&component);
+ assert(processController != nullptr);
+ stateful_processor_ = dynamic_cast<StatefulProcessor*>(processController->getProcessor());
+ assert(stateful_processor_ != nullptr);
+ stateful_processor_->setHooks(on_schedule_hook_, on_trigger_hooks_);
+ }
+
+ ++controllerVecIdx;
+ });
+
+ // check controller vector size
+ assert(controllerVecIdx == 2);
}
void runAssertions() override {
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp b/libminifi/test/pcap-tests/PcapTest.cpp
index 236e0eb..4e408f4 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -78,15 +78,14 @@ class PcapTestHarness : public IntegrationBase {
}
void updateProperties(minifi::FlowController& fc) override {
- auto components = fc.getComponents("pcap");
- for (const auto& component : components) {
- auto proccontroller = dynamic_cast<minifi::state::ProcessorController*>(component);
+ fc.executeOnComponent("pcap", [this] (minifi::state::StateController& component) {
+ auto proccontroller = dynamic_cast<minifi::state::ProcessorController*>(&component);
if (proccontroller) {
auto processor = proccontroller->getProcessor();
processor->setProperty(minifi::processors::CapturePacket::BaseDir.getName(), dir);
processor->setProperty(minifi::processors::CapturePacket::NetworkControllers.getName(), ".*");
}
- }
+ });
}
protected:
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index bd7cf9b..0ab8877 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -79,15 +79,16 @@ class TestUpdateSink : public minifi::state::StateMonitor {
explicit TestUpdateSink(std::shared_ptr<StateController> controller)
: is_running(true),
clear_calls(0),
- controller(controller),
+ controller(std::move(controller)),
update_calls(0) {
}
- std::vector<StateController*> getComponents(const std::string& /*name*/) override {
- return std::vector<StateController*>{ controller.get() };
+
+ void executeOnComponent(const std::string&, std::function<void(minifi::state::StateController&)> func) override {
+ func(*controller);
}
- std::vector<StateController*> getAllComponents() override {
- return std::vector<StateController*>{ controller.get() };
+ void executeOnAllComponents(std::function<void(minifi::state::StateController&)> func) override {
+ func(*controller);
}
std::string getComponentName() const override {