You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/05/06 13:31:15 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 107f33955 MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI
107f33955 is described below

commit 107f339553ad0a8dee64730b636d9c442ce5b2b2
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Thu Apr 14 16:57:25 2022 +0200

    MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1309
---
 .../tests/C2ClearCoreComponentStateTest.cpp        | 42 +++++++++++++++++-----
 .../standard-processors/processors/GetTCP.cpp      | 10 +++---
 .../standard-processors/processors/TailFile.h      |  2 +-
 libminifi/include/utils/MinifiConcurrentQueue.h    |  9 +++++
 libminifi/include/utils/StringUtils.h              |  9 +++++
 libminifi/include/utils/ThreadPool.h               | 29 +++++++--------
 libminifi/src/utils/StringUtils.cpp                | 14 ++++++++
 libminifi/src/utils/ThreadPool.cpp                 | 40 +++++++++++++++++----
 libminifi/test/unit/BackTraceTests.cpp             |  4 +--
 libminifi/test/unit/SocketTests.cpp                |  2 +-
 libminifi/test/unit/StringUtilsTests.cpp           | 10 ++++++
 libminifi/test/unit/ThreadPoolTests.cpp            |  4 +--
 nanofi/include/cxx/Instance.h                      |  8 ++---
 13 files changed, 137 insertions(+), 46 deletions(-)

diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index b85f5dac8..fa9fadfc9 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -41,6 +41,8 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
     LogTestController::getInstance().setDebug<minifi::core::ProcessContext>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
     VerifyC2Base::testSetup();
   }
 
@@ -49,6 +51,10 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
     assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }));
   }
 
+  [[nodiscard]] std::string getFile1Location() const {
+    return test_file_1_;
+  }
+
  protected:
   void updateProperties(minifi::FlowController& flow_controller) override {
     auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
@@ -70,14 +76,20 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
 
 class ClearCoreComponentStateHandler: public HeartbeatHandler {
  public:
-  explicit ClearCoreComponentStateHandler(std::atomic_bool& component_cleared_successfully, std::shared_ptr<minifi::Configure> configuration)
+  explicit ClearCoreComponentStateHandler(std::atomic_bool& component_cleared_successfully,
+                                          std::shared_ptr<minifi::Configure> configuration,
+                                          std::string file1Location)
     : HeartbeatHandler(std::move(configuration)),
-      component_cleared_successfully_(component_cleared_successfully) {
+      component_cleared_successfully_(component_cleared_successfully),
+      file_1_location_(std::move(file1Location)) {
   }
 
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
     switch (flow_state_) {
       case FlowState::STARTED:
+        assert(verifyLogLinePresenceInPollTime(10s, "ProcessSession committed for TailFile1"));
+        assert(verifyLogLinePresenceInPollTime(10s, "ProcessSession committed for TailFile2"));
         sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
         flow_state_ = FlowState::FIRST_DESCRIBE_SENT;
         break;
@@ -86,9 +98,21 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
         flow_state_ = FlowState::CLEAR_SENT;
         break;
       }
-      default:
+      case FlowState::CLEAR_SENT: {
+        using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+        auto tail_file_ran_again_checker = [this] {
+          const auto log_contents = LogTestController::getInstance().log_output.str();
+          const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
+          const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
+          const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
+          return utils::StringUtils::matchesSequence(log_contents, patterns);
+        };
+        assert(verifyEventHappenedInPollTime(10s, tail_file_ran_again_checker));
         sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", conn);
         flow_state_ = FlowState::SECOND_DESCRIBE_SENT;
+        break;
+      }
+      default: {}
     }
   }
 
@@ -118,14 +142,15 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
       case FlowState::CLEAR_SENT:
         break;
       case FlowState::SECOND_DESCRIBE_SENT: {
-        auto clearedStateFound = [this, &root]() {
-          return root.HasMember("corecomponentstate") &&
+        const bool clearedStateFound =
+            root.HasMember("corecomponentstate") &&
             root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") &&
             root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") &&
             std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()) == last_read_time_2_ &&
             std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()) != last_read_time_1_;
-        };
-        component_cleared_successfully_ = clearedStateFound();
+        if (clearedStateFound) {
+          component_cleared_successfully_ = clearedStateFound;
+        }
         break;
       }
       default:
@@ -145,6 +170,7 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
   std::atomic_bool& component_cleared_successfully_;
   std::string last_read_time_1_;
   std::string last_read_time_2_;
+  std::string file_1_location_;
 };
 
 int main(int argc, char **argv) {
@@ -152,7 +178,7 @@ int main(int argc, char **argv) {
   const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
   VerifyC2ClearCoreComponentState harness(component_cleared_successfully);
   harness.setKeyDir(args.key_dir);
-  ClearCoreComponentStateHandler handler(component_cleared_successfully, harness.getConfiguration());
+  ClearCoreComponentStateHandler handler(component_cleared_successfully, harness.getConfiguration(), harness.getFile1Location());
   harness.setUrl(args.url, &handler);
   harness.run(args.test_file);
   return 0;
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index f3aa31314..5fc236130 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -287,18 +287,16 @@ void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
       auto* future = new std::future<int>();
       std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
       utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
-      if (client_thread_pool_.execute(std::move(functor), *future)) {
-        live_clients_[endpoint] = future;
-      }
+      client_thread_pool_.execute(std::move(functor), *future);
+      live_clients_[endpoint] = future;
     } else {
       if (!endPointFuture->second->valid()) {
         delete endPointFuture->second;
         auto* future = new std::future<int>();
         std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
         utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
-        if (client_thread_pool_.execute(std::move(functor), *future)) {
-          live_clients_[endpoint] = future;
-        }
+        client_thread_pool_.execute(std::move(functor), *future);
+        live_clients_[endpoint] = future;
       } else {
         logger_->log_debug("Thread still running for %s", endPointFuture->first);
         // we have a thread corresponding to this.
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 10bb94bb6..290df29bf 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -179,7 +179,7 @@ class TailFile : public core::Processor {
   static const int BUFFER_SIZE = 512;
 
   std::string delimiter_;  // Delimiter for the data incoming from the tailed file.
-  core::CoreComponentStateManager* state_manager_;
+  core::CoreComponentStateManager* state_manager_ = nullptr;
   std::map<std::string, TailState> tail_states_;
   Mode tail_mode_ = Mode::UNDEFINED;
   std::string file_to_tail_;
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h
index 559164a20..5c229fac8 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -18,6 +18,7 @@
 #define LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
 
 
+#include <algorithm>
 #include <chrono>
 #include <deque>
 #include <mutex>
@@ -81,6 +82,12 @@ class ConcurrentQueue {
     queue_.clear();
   }
 
+  template<typename Functor>
+  void remove(Functor fun) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.erase(std::remove_if(queue_.begin(), queue_.end(), fun), queue_.end());
+  }
+
   template <typename... Args>
   void enqueue(Args&&... args) {
     std::lock_guard<std::mutex> guard(mtx_);
@@ -223,6 +230,8 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
     return running_;  // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads
   }
 
+  using ConcurrentQueue<T>::remove;
+
  private:
   bool running_;
   std::condition_variable cv_;
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index cf8d8c691..647bca043 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -487,6 +487,15 @@ class StringUtils {
 
   static std::string escapeUnprintableBytes(gsl::span<const std::byte> data);
 
+  /**
+   * Returns whether sequence of patterns are found in given string in their incoming order
+   * Non-regex search!
+   * @param str string to search in
+   * @param patterns sequence of patterns to search
+   * @return success of string sequence matching
+   */
+  static bool matchesSequence(std::string_view str, const std::vector<std::string>& patterns);
+
  private:
 };
 
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 161d33025..fc4b64d91 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -18,21 +18,21 @@
 #define LIBMINIFI_INCLUDE_UTILS_THREADPOOL_H_
 
 #include <algorithm>
-#include <memory>
-#include <string>
-#include <utility>
+#include <atomic>
 #include <chrono>
-#include <sstream>
+#include <functional>
+#include <future>
 #include <iostream>
-#include <atomic>
-#include <mutex>
 #include <map>
-#include <unordered_map>
-#include <vector>
+#include <memory>
+#include <mutex>
 #include <queue>
-#include <future>
+#include <sstream>
+#include <string>
 #include <thread>
-#include <functional>
+#include <unordered_map>
+#include <utility>
+#include <vector>
 
 #include "BackTrace.h"
 #include "MinifiConcurrentQueue.h"
@@ -177,7 +177,6 @@ class ThreadPool {
         controller_service_provider_(controller_service_provider),
         name_(std::move(name)) {
     current_workers_ = 0;
-    task_count_ = 0;
     thread_manager_ = nullptr;
   }
 
@@ -196,9 +195,8 @@ class ThreadPool {
    * @param task this thread pool will subsume ownership of
    * the worker task
    * @param future future to move new promise to
-   * @return true if future can be created and thread pool is in a running state.
    */
-  bool execute(Worker<T> &&task, std::future<T> &future);
+  void execute(Worker<T> &&task, std::future<T> &future);
 
   /**
    * attempts to stop tasks with the provided identifier.
@@ -310,7 +308,6 @@ class ThreadPool {
   int max_worker_threads_;
 // current worker tasks.
   std::atomic<int> current_workers_;
-  std::atomic<int> task_count_;
 // thread queue
   std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
 // manager thread
@@ -340,6 +337,10 @@ class ThreadPool {
   std::recursive_mutex manager_mutex_;
   // thread pool name
   std::string name_;
+  // count of running tasks by ID
+  std::unordered_map<TaskId, uint32_t> running_task_count_by_id_;
+  // variable to signal task running completion
+  std::condition_variable task_run_complete_;
 
   /**
    * Call for the manager to start worker threads
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index 8913963fb..7c5dae6bd 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -500,4 +500,18 @@ std::string StringUtils::escapeUnprintableBytes(gsl::span<const std::byte> data)
   return result;
 }
 
+bool StringUtils::matchesSequence(std::string_view str, const std::vector<std::string>& patterns) {
+  size_t pos = 0;
+
+  for (const auto& pattern : patterns) {
+    pos = str.find(pattern, pos);
+    if (pos == std::string_view::npos) {
+      return false;
+    }
+    pos += pattern.size();
+  }
+
+  return true;
+}
+
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index a74313d2b..ade82017b 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -48,8 +48,20 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
           worker_queue_.enqueue(std::move(task));
           continue;
         }
+        ++running_task_count_by_id_[task.getIdentifier()];
       }
-      if (task.run()) {
+      const bool taskRunResult = task.run();
+      {
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        auto& count = running_task_count_by_id_[task.getIdentifier()];
+        if (count == 1) {
+          running_task_count_by_id_.erase(task.getIdentifier());
+        } else {
+          --count;
+        }
+      }
+      task_run_complete_.notify_all();
+      if (taskRunResult) {
         if (task.getNextExecutionTime() <= std::chrono::steady_clock::now()) {
           // it can be rescheduled again as soon as there is a worker available
           worker_queue_.enqueue(std::move(task));
@@ -101,17 +113,13 @@ void ThreadPool<T>::manage_delayed_queue() {
 }
 
 template<typename T>
-bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+void ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
   {
     std::unique_lock<std::mutex> lock(worker_queue_mutex_);
     task_status_[task.getIdentifier()] = true;
   }
   future = std::move(task.getPromise()->get_future());
   worker_queue_.enqueue(std::move(task));
-
-  task_count_++;
-
-  return true;
 }
 
 template<typename T>
@@ -200,6 +208,26 @@ template<typename T>
 void ThreadPool<T>::stopTasks(const TaskId &identifier) {
   std::unique_lock<std::mutex> lock(worker_queue_mutex_);
   task_status_[identifier] = false;
+
+  // remove tasks belonging to identifier from worker_queue_
+  worker_queue_.remove([&] (const Worker<T>& worker) { return worker.getIdentifier() == identifier; });
+
+  // also remove from delayed_worker_queue_
+  decltype(delayed_worker_queue_) new_delayed_worker_queue;
+  while (!delayed_worker_queue_.empty()) {
+    Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+    delayed_worker_queue_.pop();
+    if (task.getIdentifier() != identifier) {
+      new_delayed_worker_queue.push(std::move(task));
+    }
+  }
+  delayed_worker_queue_ = std::move(new_delayed_worker_queue);
+
+  // if tasks are in progress, wait for their completion
+  task_run_complete_.wait(lock, [&] () {
+    auto iter = running_task_count_by_id_.find(identifier);
+    return iter == running_task_count_by_id_.end() || iter->second == 0;
+  });
 }
 
 template<typename T>
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 05fbea77e..e470206ac 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -93,7 +93,7 @@ TEST_CASE("BT2", "[TPT2]") {
     utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
 
     std::future<int> fut;
-    REQUIRE(true == pool.execute(std::move(functor), fut));
+    pool.execute(std::move(functor), fut);
   }
 
   std::function<int()> f_ex = counterFunction;
@@ -101,7 +101,7 @@ TEST_CASE("BT2", "[TPT2]") {
   utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
 
   std::future<int> fut;
-  REQUIRE(true == pool.execute(std::move(functor), fut));
+  pool.execute(std::move(functor), fut);
 
   std::vector<BackTrace> traces = pool.getTraces();
   for (const auto &trace : traces) {
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index 7e3d835a2..6653d1313 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -190,7 +190,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") {
     std::function<bool()> f_ex = createSocket;
     utils::Worker<bool> functor(f_ex, "id");
     std::future<bool> fut;
-    REQUIRE(true == pool.execute(std::move(functor), fut));
+    pool.execute(std::move(functor), fut);
     futures.push_back(std::move(fut));
   }
   pool.start();
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 46c4b1573..fe4ea427b 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -532,3 +532,13 @@ TEST_CASE("StringUtils::escapeUnprintableBytes", "[escapeUnprintableBytes]") {
   REQUIRE(StringUtils::escapeUnprintableBytes(from_cstring("ab\n\r\t\v\fde")) == "ab\\n\\r\\t\\v\\fde");
   REQUIRE(StringUtils::escapeUnprintableBytes(from_cstring("ab\x00""c\x01""d")) == "ab\\x00c\\x01d");
 }
+
+TEST_CASE("StringUtils::matchesSequence works correctly", "[matchesSequence]") {
+  REQUIRE(StringUtils::matchesSequence("abcdef", {"abc", "def"}));
+  REQUIRE(!StringUtils::matchesSequence("abcef", {"abc", "def"}));
+  REQUIRE(StringUtils::matchesSequence("xxxabcxxxdefxxx", {"abc", "def"}));
+  REQUIRE(!StringUtils::matchesSequence("defabc", {"abc", "def"}));
+  REQUIRE(StringUtils::matchesSequence("xxxabcxxxabcxxxdefxxx", {"abc", "def"}));
+  REQUIRE(StringUtils::matchesSequence("xxxabcxxxabcxxxdefxxx", {"abc", "abc", "def"}));
+  REQUIRE(!StringUtils::matchesSequence("xxxabcxxxdefxxx", {"abc", "abc", "def"}));
+}
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index bd2eab980..c4e613cad 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -72,7 +72,7 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   utils::Worker<bool> functor(f_ex, "id");
   pool.start();
   std::future<bool> fut;
-  REQUIRE(true == pool.execute(std::move(functor), fut));
+  pool.execute(std::move(functor), fut);
   fut.wait();
   REQUIRE(true == fut.get());
 }
@@ -91,7 +91,7 @@ TEST_CASE("ThreadPoolTest2", "[TPT2]") {
   utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
   pool.start();
   std::future<int> fut;
-  REQUIRE(true == pool.execute(std::move(functor), fut));
+  pool.execute(std::move(functor), fut);
   fut.wait();
   REQUIRE(20 == fut.get());
 }
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 89b9a301a..dac38ca76 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -147,19 +147,15 @@ class Instance {
 
  protected:
 
-  bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t& /*delay*/) {
+  void registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t& /*delay*/) {
     auto functions = updateController->getFunctions();
     // run all functions independently
 
     for (auto function : functions) {
       utils::Worker<utils::TaskRescheduleInfo> functor(function, "listeners");
       std::future<utils::TaskRescheduleInfo> future;
-      if (!listener_thread_pool_.execute(std::move(functor), future)) {
-        // denote failure
-        return false;
-      }
+      listener_thread_pool_.execute(std::move(functor), future);
     }
-    return true;
   }
 
   std::shared_ptr<c2::C2CallbackAgent> agent_;