You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/24 15:54:21 UTC
nifi-minifi-cpp git commit: MINIFI-207: Use recursive mutex that
avoid thread safety concerns
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master d26d65af4 -> cbc12903f
MINIFI-207: Use recursive mutex that avoid thread safety concerns
This closes #102.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/cbc12903
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/cbc12903
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/cbc12903
Branch: refs/heads/master
Commit: cbc12903ffb81bf16808f61a42e772d7c4676c06
Parents: d26d65a
Author: Marc Parisi <ph...@apache.org>
Authored: Wed May 17 16:09:07 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed May 24 11:53:31 2017 -0400
----------------------------------------------------------------------
libminifi/include/core/Core.h | 4 ++--
libminifi/include/core/ProcessGroup.h | 6 +++---
libminifi/src/core/ProcessGroup.cpp | 22 ++++++++++++----------
3 files changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 335f306..3864882 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -146,8 +146,8 @@ class CoreComponent {
const std::string & getUUIDStr() {
return uuidStr_;
}
-
- void loadComponent(){
+
+ void loadComponent() {
}
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index f2f9a63..ccf744e 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -120,12 +120,12 @@ class ProcessGroup {
bool isRootProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent(void) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
// Add processor
@@ -197,7 +197,7 @@ class ProcessGroup {
private:
// Mutex for protection
- std::mutex mutex_;
+ std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 1b8ec3a..01d3dbf 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -67,12 +67,12 @@ ProcessGroup::~ProcessGroup() {
}
bool ProcessGroup::isRootProcessGroup() {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
return (type_ == ROOT_PROCESS_GROUP);
}
void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
@@ -83,7 +83,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
}
void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
@@ -94,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
}
void ProcessGroup::addProcessGroup(ProcessGroup *child) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
@@ -105,7 +105,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
}
void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
@@ -117,7 +117,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
EventDrivenSchedulingAgent *eventScheduler) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Start all the processor node, input and output ports
@@ -148,7 +148,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
EventDrivenSchedulingAgent *eventScheduler) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Stop all the processor node, input and output ports
@@ -176,6 +176,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_info("find processor %s", processor->getName().c_str());
@@ -220,6 +221,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr
std::shared_ptr<Processor> ProcessGroup::findProcessor(
const std::string &processorName) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName().c_str());
@@ -238,7 +240,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(
void ProcessGroup::updatePropertyValue(std::string processorName,
std::string propertyName,
std::string propertyValue) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto processor : processors_) {
if (processor->getName() == processorName) {
processor->setProperty(propertyName, propertyValue);
@@ -262,7 +264,7 @@ void ProcessGroup::getConnections(
}
void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
@@ -285,7 +287,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
}
void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet