You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/09/14 10:42:36 UTC

[nifi-minifi-cpp] 01/07: MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch MINIFICPP-1348-RC1
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 61a46d1aa934acdfbe6aa21bd429c6315a7a8e7a
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Sep 3 18:36:14 2020 +0200

    MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #894
---
 extensions/librdkafka/PublishKafka.cpp | 1 +
 extensions/librdkafka/PublishKafka.h   | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 1b92edc..5b007a3 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -542,6 +542,7 @@ void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &conte
 void PublishKafka::notifyStop() {
   logger_->log_debug("notifyStop called");
   interrupted_ = true;
+  std::lock_guard<std::mutex> conn_lock(connection_mutex_);
   std::lock_guard<std::mutex> lock(messages_mutex_);
   for (auto& messages : messages_set_) {
     messages->interrupt();
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 7cc9101..44e2634 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -125,7 +125,7 @@ class PublishKafka : public core::Processor {
   utils::Regex attributeNameRegex_;
 
   std::atomic<bool> interrupted_{false};
-  std::mutex messages_mutex_;
+  std::mutex messages_mutex_;  // If both connection_mutex_ and messages_mutex_ are needed, always take connection_mutex_ first to avoid deadlock
   std::set<std::shared_ptr<Messages>> messages_set_;
 };