You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/11/18 13:47:15 UTC
[nifi-minifi-cpp] 03/03: MINIFICPP-1989 - Discard old controller serivice provider from FlowController
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 8d20c448215a2e7bb0e7d4e49422ec172774b8aa
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Nov 16 21:45:24 2022 +0100
MINIFICPP-1989 - Discard old controller serivice provider from FlowController
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1453
---
libminifi/include/SchedulingAgent.h | 3 ++
libminifi/include/core/Processor.h | 2 ++
libminifi/src/FlowController.cpp | 6 +++-
libminifi/src/core/Processor.cpp | 4 +++
libminifi/test/flow-tests/FlowControllerTests.cpp | 36 ++++++++++++++++++++++
libminifi/test/flow-tests/TestControllerWithFlow.h | 7 +++--
6 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index f02f4773a..3ec3ad892 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -78,6 +78,8 @@ class SchedulingAgent {
watchDogTimer_.reset(new utils::CallBackTimer(std::chrono::milliseconds(SCHEDULING_WATCHDOG_CHECK_PERIOD_MS), f));
watchDogTimer_->start();
}
+
+ logger_->log_trace("Creating scheduling agent");
}
virtual ~SchedulingAgent() {
@@ -85,6 +87,7 @@ class SchedulingAgent {
// The destructor of the timer also stops is, but the stop should happen first!
// Otherwise the callback might access already destructed members.
watchDogTimer_.reset();
+ logger_->log_trace("Destroying scheduling agent");
}
// onTrigger, return whether the yield is need
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index bf3164016..81706134f 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -84,6 +84,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state
bool isRunning() override;
+ ~Processor() override;
+
void setScheduledState(ScheduledState state);
ScheduledState getScheduledState() const {
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index a6ca3a3dc..6ef93b35b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -100,6 +100,7 @@ FlowController::~FlowController() {
protocol_ = nullptr;
flow_file_repo_ = nullptr;
provenance_repo_ = nullptr;
+ logger_->log_trace("Destroying FlowController");
}
bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) {
@@ -129,7 +130,10 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
stop();
unload();
- controller_map_->clear();
+
+ // prepare to accept the new controller service provider from flow_configuration_
+ clearControllerServices();
+
clearResponseNodes();
if (metrics_publisher_) {
metrics_publisher_->clearMetricNodes();
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 26adfe1ef..022941b24 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -79,6 +79,10 @@ Processor::Processor(std::string name, const utils::Identifier& uuid, std::share
logger_->log_debug("Processor %s created with uuid %s", name_, getUUIDStr());
}
+Processor::~Processor() {
+ logger_->log_debug("Destroying processor %s with uuid %s", name_, getUUIDStr());
+}
+
bool Processor::isRunning() {
return (state_ == RUNNING && active_tasks_ > 0);
}
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 2000132c7..8087da65c 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -35,6 +35,8 @@
#include "YamlConfiguration.h"
#include "CustomProcessors.h"
#include "TestControllerWithFlow.h"
+#include "EmptyFlow.h"
+#include "utils/IntegrationTestUtils.h"
using namespace std::literals::chrono_literals;
@@ -263,3 +265,37 @@ TEST_CASE("Extend the waiting period during shutdown", "[TestFlow4]") {
REQUIRE(sourceProc->trigger_count.load() >= 1);
REQUIRE(sinkProc->trigger_count.load() >= 3);
}
+
+TEST_CASE("FlowController destructor releases resources", "[TestFlow5]") {
+ TestControllerWithFlow controller(R"(
+Flow Controller:
+ name: Banana Bread
+Processors:
+- name: GenFF
+ id: 00000000-0000-0000-0000-000000000001
+ class: GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 10 sec
+Connections: []
+Remote Processing Groups: []
+Controller Services: []
+)");
+
+ controller.startFlow();
+
+ REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
+ LogTestController::getInstance().clear();
+
+ bool update_successful = controller.controller_->applyConfiguration("/flows/1", empty_flow);
+ REQUIRE(update_successful);
+
+ REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
+ REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
+ LogTestController::getInstance().clear();
+
+ // manually destroy the controller
+ controller.controller_.reset();
+
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(0s, "Destroying FlowController"));
+ REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
+}
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b311b15c3..b72a188a9 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -39,6 +39,7 @@ class TestControllerWithFlow: public TestController{
LogTestController::getInstance().setTrace<core::Processor>();
LogTestController::getInstance().setTrace<minifi::TimerDrivenSchedulingAgent>();
LogTestController::getInstance().setTrace<minifi::EventDrivenSchedulingAgent>();
+ LogTestController::getInstance().setTrace<minifi::FlowController>();
home_ = createTempDirectory();
@@ -78,8 +79,10 @@ class TestControllerWithFlow: public TestController{
}
~TestControllerWithFlow() {
- controller_->stop();
- controller_->unload();
+ if (controller_) {
+ controller_->stop();
+ controller_->unload();
+ }
LogTestController::getInstance().reset();
}