You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/05/14 15:05:36 UTC

[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #776: MINIFICPP-1202 - Handle C2 requests/responses using MinifiConcurrentQueue

hunyadi-dev commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r425209344



##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,54 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { payload_batch.emplace_back(std::move(payload)); };
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; ++attempt_num) {
+        if (!requests.consumeWaitFor(getRequestPayload, std::chrono::seconds(1))) {
+          break;
         }
       }
-      try {
-        performHeartBeat();
-      }
-      catch(const std::exception &e) {
-        logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
-      }
-      catch(...) {
-        logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
-      }
+      std::for_each(
+        std::make_move_iterator(payload_batch.begin()),
+        std::make_move_iterator(payload_batch.end()),
+        [&] (C2Payload&& payload) {
+          try {
+            C2Payload && response = protocol_.load()->consumePayload(std::move(payload));
+            enqueue_c2_server_response(std::move(response));
+          }
+          catch(const std::exception &e) {
+            logger_->log_error("Exception occurred while consuming payload. error: %s", e.what());
+          }
+          catch(...) {
+            logger_->log_error("Unknonwn exception occurred while consuming payload.");
+          }
+        });
 
-      checkTriggers();
+        try {
+          performHeartBeat();
+        }
+        catch (const std::exception &e) {
+          logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
+        }
+        catch (...) {
+          logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
+        }
+    }
+
+    checkTriggers();
+
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+  };
 
-      return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
-    };
   functions_.push_back(c2_producer_);
 
-  c2_consumer_ = [&]() {
-    if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
-      C2Payload payload(Operation::HEARTBEAT);
-      {
-        std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
-        if (responses.empty()) {
-          return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
-        }
-        payload = std::move(responses.back());
-        responses.pop_back();
+  c2_consumer_ = [&] {
+    if (responses.size()) {
+      if (!responses.consumeWaitFor([this](C2Payload&& e) { extractPayload(std::move(e)); }, std::chrono::seconds(1))) {

Review comment:
       This is the same as before, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org