You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/05 07:16:27 UTC

[GitHub] [nifi-minifi-cpp] dam4rus opened a new pull request #992: MINIFICPP-1467 Remove mutex from Connection

dam4rus opened a new pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579196749



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -135,6 +143,22 @@ class ConcurrentQueue {
   std::deque<T> queue_;
 };
 
+// Enables batched operations on a ConcurrentQueue by holding a reference to the queue and locking it's mutex
+// until it leaves the scope. Can help performance by avoiding multiple locking in loops, etc.
+template <typename T>
+class LockedConcurrentQueue {
+ public:
+  explicit LockedConcurrentQueue(ConcurrentQueue<T>& concurrentQueue)
+      : queue_(std::ref(concurrentQueue.queue_))
+      , lock_(concurrentQueue.mtx_) {
+  }
+
+  std::deque<T>* operator->() const { return &queue_.get(); }

Review comment:
       This exposes the underlying queue object. This is a big no-no.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits closed pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits closed pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-780677873


   TBH, I don't really see how `Connection::poll()` could be refactored, since I'm not super familiar with the code base, to avoid performance degradation with `ConcurrentQueue` since it has a more complex structure. Using a lambda would be weird since the loop has `break` in it. Maybe something like `ConcurrentQueue::consume_while(function(item)->bool)` could be implemented, where a `false` return value would indicate the `break` and that the item isn't poped out of the queue. My biggest issue with this approach is that I generally don't think that having a side-effect in a consumer lambda is good practise.
   
   I went with this design because it required the least modification and the performance is the same as before. I kinda agree that maybe leaving it as it was is fine.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590496104



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       I have made some changes to this part of the code in #1013, which also removed some of the code duplication here.  Please rebase your changes on top of the current main branch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590524458



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       Yes, you're right, the two changes were at cross purposes -- this was not intentional, sorry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits closed pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits closed pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590511563



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       @fgerlits Your PR also changes the underlying queue in Connection, which makes rebasing almost impossible. I wonder if with your changes does this PR has any merit? At this point I think abandoning this ticket would be the best option.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590408649



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       Damn. You are right. Changing behavior wasn't intentional. I'll revert this back while still removing the code duplication if that's okay.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579201725



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -87,6 +91,10 @@ class ConcurrentQueue {
     queue_.emplace_back(std::forward<Args>(args)...);
   }
 
+  LockedConcurrentQueue<T> lock() {

Review comment:
       Why would you do this? If you really want to extend the functionality of the class, instead of adding friend accessors, make your queue inherit from the `ConcurrentQueue`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579212715



##########
File path: libminifi/src/Connection.cpp
##########
@@ -157,15 +149,15 @@ void Connection::put(const std::shared_ptr<core::FlowFile>& flow) {
 
 void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
   {
-    std::lock_guard<std::mutex> lock(mutex_);
+    auto lockedQueue = queue_.lock();

Review comment:
       Instead of this, I would create an interface on the queue that performs batch insertion. This piece of API could be better tested as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-780655750


   Yes, using (a modified or extended version of) ConcurrentQueue [rather than exposing the internal mutex and deque and using those] would require some refactoring of the Connection class.  That is not necessarily a bad thing, as `Connection::poll()` contains some duplicated code.
   
   I also can't see a way to avoid some performance degradation, as you mentioned.  But I don't think that is a big problem, as the difference in performance would be small.
   
   To be honest, at this point I would vote for keeping the old code as it was, but let's see what other people say.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579215333



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -87,6 +91,10 @@ class ConcurrentQueue {
     queue_.emplace_back(std::forward<Args>(args)...);
   }
 
+  LockedConcurrentQueue<T> lock() {

Review comment:
       Because `LockedConcurrentQueue` **is not a** `ConcurrentQueue`. It's just a utility class that holds a lock to a `ConcurrentQueue`'s mutex and provides access to the underlying queue until it goes out of scope. So instead of locking the mutex with every operation you can do multiple operations with a single locking. I'm open to a better name for the class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579225873



##########
File path: libminifi/src/Connection.cpp
##########
@@ -224,11 +216,11 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
 }
 
 void Connection::drain(bool delete_permanently) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  auto lockedQueue = queue_.lock();

Review comment:
       You mean only lock it before calling empty and unlock after pop_front? That's not an equivalent refactor. I don't know how costly the operations are after pop-ing the element but locking in a loop can definitely degrade the performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
szaszm commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-781478180


   Good point @adamdebreceni, that would indeed be a very welcome change. Client code being dependent on the internal std::deque could prevent us from moving to a better implementation in the future.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579212323



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -135,6 +143,22 @@ class ConcurrentQueue {
   std::deque<T> queue_;
 };
 
+// Enables batched operations on a ConcurrentQueue by holding a reference to the queue and locking it's mutex
+// until it leaves the scope. Can help performance by avoiding multiple locking in loops, etc.
+template <typename T>
+class LockedConcurrentQueue {
+ public:
+  explicit LockedConcurrentQueue(ConcurrentQueue<T>& concurrentQueue)
+      : queue_(std::ref(concurrentQueue.queue_))
+      , lock_(concurrentQueue.mtx_) {
+  }
+
+  std::deque<T>* operator->() const { return &queue_.get(); }

Review comment:
       It has already been discussed and if everyone agrees with this approach I'm totally fine with providing a public API instead of overloaded operators.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579209742



##########
File path: libminifi/src/Connection.cpp
##########
@@ -224,11 +216,11 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
 }
 
 void Connection::drain(bool delete_permanently) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  auto lockedQueue = queue_.lock();

Review comment:
       A better pattern would be just to lock the queue until you move the data out of it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579208057



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -135,6 +143,22 @@ class ConcurrentQueue {
   std::deque<T> queue_;
 };
 
+// Enables batched operations on a ConcurrentQueue by holding a reference to the queue and locking it's mutex

Review comment:
       Good point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-853754217


   Closing this, as we ended up going with a different approach, see PR #1027. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r590393358



##########
File path: libminifi/src/Connection.cpp
##########
@@ -179,56 +171,44 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 }
 
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
-    queued_data_size_ -= item->getSize();
+  auto lockedQueue = queue_.lock();
+
+  while (!lockedQueue.empty()) {
+    std::shared_ptr<core::FlowFile> item = lockedQueue.front();
+
+    // We need to check for flow expiration
+    if (expired_duration_ > 0 && utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      // Flow record expired
+      lockedQueue.pop();
+      queued_data_size_ -= item->getSize();
+      expiredFlowRecords.insert(item);
+      logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
+      continue;
+    }
 
-    if (expired_duration_ > 0) {
-      // We need to check for flow expiration
-      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
-        // Flow record expired
-        expiredFlowRecords.insert(item);
-        logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-      } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
-        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-        item->setConnection(connectable);
-        logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-        return item;
-      }
-    } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
-      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
-      item->setConnection(connectable);
-      logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
-      return item;
+    // Flow record not expired
+    if (item->isPenalized()) {
+      // Flow record was penalized
+      break;

Review comment:
       This looks like a change in behavior. Previously penalized flow files were moved to the front of the queue, now they remain at the back. I think the original behavior was intentional, but I can't say for 100% sure. Unless we can be sure that this change doesn't affect behavior, I would opt for reverting to the old behavior of reinserting flow files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus edited a comment on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus edited a comment on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-782216241


   I'm pretty sure that locking multiple times in a loop can [degrade performance](https://quick-bench.com/q/6Tb3pu8sUC80EtpJtxht7uxHlLE) significantly and [that's not what you want](https://www.techiedelight.com/why-vector-class-java-obsolete/#:~:text=This%20is%20because%20Vector%20synchronizes,synchronized%20and%20not%20individual%20operations.) most of the time. While `ConcurrentQueue::pop_all` and `ConcurrentQueue::push_all` can solve some cases but for more complex cases like `Connection::poll` it's kinda hard to write a generic solution. I can probably make it work somehow but I'm not sure that it would be easier to reason about than this solution.
   
   I've created this PR mainly as a PoC and I agree that a public API would be better than exposing the underlying queue. I don't mind if you guys would rather extend the ConcurrentQueue class or leave it as it is.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-780465445


   I agree that this implementation doesn't fit the current design but to avoid needless lockings and potentially tank the performance I choose this over `push_all`, `pop_all`
   
   For example:
   At https://github.com/apache/nifi-minifi-cpp/pull/992/files#diff-edfe02f672dad6c32d860c97ea73e5f4b95889387942c8b372554ba43fda14cdL159 `flows` are filtered so I would need to create a new container. It's not a big issue since `shared_ptr` is fairly cheap to copy but nonetheless felt unnecessary. Also, what would be the implementation of `push_all`? Should it accept `std::vector`, `std::queue` or iterator pair?
   
   But the biggest issue is here https://github.com/apache/nifi-minifi-cpp/pull/992/files#diff-edfe02f672dad6c32d860c97ea73e5f4b95889387942c8b372554ba43fda14cdL181 , since there's both pop and push in the same loop if the flow file is penalised, which breaks the loop and put's the flow file back into the queue. Maybe `pop`-ing should be moved to the end of the loop to avoid an unnecessary `pop` and `push` if the flow file is penalised? So it's not really a `pop_all`. So without exposing the underlying queue somehow, this would require some refactoring and/or hurting the performance by locking the mutex with every call to `ConcurrentQueue`'s methods.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] dam4rus commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
dam4rus commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-782216241


   I'm pretty sure that locking multiple times in a loop can [degrade performance](https://quick-bench.com/q/6Tb3pu8sUC80EtpJtxht7uxHlLE) significantly and [that's not what you want](https://www.techiedelight.com/why-vector-class-java-obsolete/#:~:text=This%20is%20because%20Vector%20synchronizes,synchronized%20and%20not%20individual%20operations.) most of the time. While `ConcurrentQueue::pop_all` and `ConcurrentQueue::push_all` can solve some cases but for more complex cases like `Connection::poll` it's kinda hard to write a generic solution. I can probably make it work somehow but I'm not sure that it would be easier to reason about that this solution.
   
   I've created this PR mainly as a PoC and I agree that a public API would be better than exposing the underlying queue. I don't mind if you guys would rather extend the ConcurrentQueue class or leave it as it is.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#discussion_r579191698



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -135,6 +143,22 @@ class ConcurrentQueue {
   std::deque<T> queue_;
 };
 
+// Enables batched operations on a ConcurrentQueue by holding a reference to the queue and locking it's mutex

Review comment:
       Very minor, but you probably meant "its" instead of it's.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-781140434


   @szaszm I think @fgerlits was referring to the `std::deque<T>* LockedConcurrentQueue::operator->() const { return &queue_.get(); }` providing direct access to the underlying queue, if `LockedConcurrentQueue` would implement the necessary methods (`push_back`, `pop_front`, `front`, `empty` by the looks of it) this could be a solid solution IMO


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
fgerlits commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-853754217


   Closing this, as we ended up going with a different approach, see PR #1027. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm edited a comment on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
szaszm edited a comment on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-780928820


   About exposing internals: I don't think it's happening here. I view `LockedConcurrentQueue` as an implementation detail of `ConcurrentQueue`, so they are one unit, even though they are syntactically implemented as two classes. To make this clear, maybe it could be hidden in a `detail` namespace and not allow client code to name it.
   
   I like the design, but on the flipside, it introduces more complexity than it removes. Overall, I support this change. If we happen to reuse this change at least once more, then the introduced complexity would pay off.
   
   Let's wait for more opinions!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #992: MINIFICPP-1467 Remove mutex from Connection

Posted by GitBox <gi...@apache.org>.
szaszm commented on pull request #992:
URL: https://github.com/apache/nifi-minifi-cpp/pull/992#issuecomment-780928820


   About exposing internals: I don't think it's happening here. I view `LockedConcurrentQueue` as an implementation detail of `ConcurrentQueue`, so they are one unit, even though they are syntactically implemented as two classes. `friend` is not evil IMO.
   
   I like the design, but on the flipside, it introduces more complexity than it removes. Overall, I support this change. If we happen to reuse this change at least once more, then the introduced complexity would pay off.
   
   Let's wait for more opinions!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org