You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/09/14 10:42:36 UTC
[nifi-minifi-cpp] 01/07: MINIFICPP-1351 fix
PublishKafka::notifyStop race condition over connection
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch MINIFICPP-1348-RC1
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 61a46d1aa934acdfbe6aa21bd429c6315a7a8e7a
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Sep 3 18:36:14 2020 +0200
MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #894
---
extensions/librdkafka/PublishKafka.cpp | 1 +
extensions/librdkafka/PublishKafka.h | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 1b92edc..5b007a3 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -542,6 +542,7 @@ void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &conte
void PublishKafka::notifyStop() {
logger_->log_debug("notifyStop called");
interrupted_ = true;
+ std::lock_guard<std::mutex> conn_lock(connection_mutex_);
std::lock_guard<std::mutex> lock(messages_mutex_);
for (auto& messages : messages_set_) {
messages->interrupt();
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 7cc9101..44e2634 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -125,7 +125,7 @@ class PublishKafka : public core::Processor {
utils::Regex attributeNameRegex_;
std::atomic<bool> interrupted_{false};
- std::mutex messages_mutex_;
+ std::mutex messages_mutex_; // If both connection_mutex_ and messages_mutex_ are needed, always take connection_mutex_ first to avoid deadlock
std::set<std::shared_ptr<Messages>> messages_set_;
};