You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/05/06 13:31:15 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 107f33955 MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI
107f33955 is described below
commit 107f339553ad0a8dee64730b636d9c442ce5b2b2
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Thu Apr 14 16:57:25 2022 +0200
MINIFICPP-1795 - C2ClearCoreComponentStateTest fails sometimes on CI
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1309
---
.../tests/C2ClearCoreComponentStateTest.cpp | 42 +++++++++++++++++-----
.../standard-processors/processors/GetTCP.cpp | 10 +++---
.../standard-processors/processors/TailFile.h | 2 +-
libminifi/include/utils/MinifiConcurrentQueue.h | 9 +++++
libminifi/include/utils/StringUtils.h | 9 +++++
libminifi/include/utils/ThreadPool.h | 29 +++++++--------
libminifi/src/utils/StringUtils.cpp | 14 ++++++++
libminifi/src/utils/ThreadPool.cpp | 40 +++++++++++++++++----
libminifi/test/unit/BackTraceTests.cpp | 4 +--
libminifi/test/unit/SocketTests.cpp | 2 +-
libminifi/test/unit/StringUtilsTests.cpp | 10 ++++++
libminifi/test/unit/ThreadPoolTests.cpp | 4 +--
nanofi/include/cxx/Instance.h | 8 ++---
13 files changed, 137 insertions(+), 46 deletions(-)
diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index b85f5dac8..fa9fadfc9 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -41,6 +41,8 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::core::ProcessContext>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
VerifyC2Base::testSetup();
}
@@ -49,6 +51,10 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }));
}
+ [[nodiscard]] std::string getFile1Location() const {
+ return test_file_1_;
+ }
+
protected:
void updateProperties(minifi::FlowController& flow_controller) override {
auto setFileName = [] (const std::string& fileName, minifi::state::StateController& component){
@@ -70,14 +76,20 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
class ClearCoreComponentStateHandler: public HeartbeatHandler {
public:
- explicit ClearCoreComponentStateHandler(std::atomic_bool& component_cleared_successfully, std::shared_ptr<minifi::Configure> configuration)
+ explicit ClearCoreComponentStateHandler(std::atomic_bool& component_cleared_successfully,
+ std::shared_ptr<minifi::Configure> configuration,
+ std::string file1Location)
: HeartbeatHandler(std::move(configuration)),
- component_cleared_successfully_(component_cleared_successfully) {
+ component_cleared_successfully_(component_cleared_successfully),
+ file_1_location_(std::move(file1Location)) {
}
void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
switch (flow_state_) {
case FlowState::STARTED:
+ assert(verifyLogLinePresenceInPollTime(10s, "ProcessSession committed for TailFile1"));
+ assert(verifyLogLinePresenceInPollTime(10s, "ProcessSession committed for TailFile2"));
sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
flow_state_ = FlowState::FIRST_DESCRIBE_SENT;
break;
@@ -86,9 +98,21 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
flow_state_ = FlowState::CLEAR_SENT;
break;
}
- default:
+ case FlowState::CLEAR_SENT: {
+ using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+ auto tail_file_ran_again_checker = [this] {
+ const auto log_contents = LogTestController::getInstance().log_output.str();
+ const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
+ const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
+ const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
+ return utils::StringUtils::matchesSequence(log_contents, patterns);
+ };
+ assert(verifyEventHappenedInPollTime(10s, tail_file_ran_again_checker));
sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", conn);
flow_state_ = FlowState::SECOND_DESCRIBE_SENT;
+ break;
+ }
+ default: {}
}
}
@@ -118,14 +142,15 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
case FlowState::CLEAR_SENT:
break;
case FlowState::SECOND_DESCRIBE_SENT: {
- auto clearedStateFound = [this, &root]() {
- return root.HasMember("corecomponentstate") &&
+ const bool clearedStateFound =
+ root.HasMember("corecomponentstate") &&
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") &&
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") &&
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()) == last_read_time_2_ &&
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()) != last_read_time_1_;
- };
- component_cleared_successfully_ = clearedStateFound();
+ if (clearedStateFound) {
+ component_cleared_successfully_ = clearedStateFound;
+ }
break;
}
default:
@@ -145,6 +170,7 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
std::atomic_bool& component_cleared_successfully_;
std::string last_read_time_1_;
std::string last_read_time_2_;
+ std::string file_1_location_;
};
int main(int argc, char **argv) {
@@ -152,7 +178,7 @@ int main(int argc, char **argv) {
const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
VerifyC2ClearCoreComponentState harness(component_cleared_successfully);
harness.setKeyDir(args.key_dir);
- ClearCoreComponentStateHandler handler(component_cleared_successfully, harness.getConfiguration());
+ ClearCoreComponentStateHandler handler(component_cleared_successfully, harness.getConfiguration(), harness.getFile1Location());
harness.setUrl(args.url, &handler);
harness.run(args.test_file);
return 0;
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index f3aa31314..5fc236130 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -287,18 +287,16 @@ void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
auto* future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
- if (client_thread_pool_.execute(std::move(functor), *future)) {
- live_clients_[endpoint] = future;
- }
+ client_thread_pool_.execute(std::move(functor), *future);
+ live_clients_[endpoint] = future;
} else {
if (!endPointFuture->second->valid()) {
delete endPointFuture->second;
auto* future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
- if (client_thread_pool_.execute(std::move(functor), *future)) {
- live_clients_[endpoint] = future;
- }
+ client_thread_pool_.execute(std::move(functor), *future);
+ live_clients_[endpoint] = future;
} else {
logger_->log_debug("Thread still running for %s", endPointFuture->first);
// we have a thread corresponding to this.
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 10bb94bb6..290df29bf 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -179,7 +179,7 @@ class TailFile : public core::Processor {
static const int BUFFER_SIZE = 512;
std::string delimiter_; // Delimiter for the data incoming from the tailed file.
- core::CoreComponentStateManager* state_manager_;
+ core::CoreComponentStateManager* state_manager_ = nullptr;
std::map<std::string, TailState> tail_states_;
Mode tail_mode_ = Mode::UNDEFINED;
std::string file_to_tail_;
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h
index 559164a20..5c229fac8 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -18,6 +18,7 @@
#define LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
+#include <algorithm>
#include <chrono>
#include <deque>
#include <mutex>
@@ -81,6 +82,12 @@ class ConcurrentQueue {
queue_.clear();
}
+ template<typename Functor>
+ void remove(Functor fun) {
+ std::lock_guard<std::mutex> guard(mtx_);
+ queue_.erase(std::remove_if(queue_.begin(), queue_.end(), fun), queue_.end());
+ }
+
template <typename... Args>
void enqueue(Args&&... args) {
std::lock_guard<std::mutex> guard(mtx_);
@@ -223,6 +230,8 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> {
return running_; // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads
}
+ using ConcurrentQueue<T>::remove;
+
private:
bool running_;
std::condition_variable cv_;
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index cf8d8c691..647bca043 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -487,6 +487,15 @@ class StringUtils {
static std::string escapeUnprintableBytes(gsl::span<const std::byte> data);
+ /**
+ * Returns whether sequence of patterns are found in given string in their incoming order
+ * Non-regex search!
+ * @param str string to search in
+ * @param patterns sequence of patterns to search
+ * @return success of string sequence matching
+ */
+ static bool matchesSequence(std::string_view str, const std::vector<std::string>& patterns);
+
private:
};
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 161d33025..fc4b64d91 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -18,21 +18,21 @@
#define LIBMINIFI_INCLUDE_UTILS_THREADPOOL_H_
#include <algorithm>
-#include <memory>
-#include <string>
-#include <utility>
+#include <atomic>
#include <chrono>
-#include <sstream>
+#include <functional>
+#include <future>
#include <iostream>
-#include <atomic>
-#include <mutex>
#include <map>
-#include <unordered_map>
-#include <vector>
+#include <memory>
+#include <mutex>
#include <queue>
-#include <future>
+#include <sstream>
+#include <string>
#include <thread>
-#include <functional>
+#include <unordered_map>
+#include <utility>
+#include <vector>
#include "BackTrace.h"
#include "MinifiConcurrentQueue.h"
@@ -177,7 +177,6 @@ class ThreadPool {
controller_service_provider_(controller_service_provider),
name_(std::move(name)) {
current_workers_ = 0;
- task_count_ = 0;
thread_manager_ = nullptr;
}
@@ -196,9 +195,8 @@ class ThreadPool {
* @param task this thread pool will subsume ownership of
* the worker task
* @param future future to move new promise to
- * @return true if future can be created and thread pool is in a running state.
*/
- bool execute(Worker<T> &&task, std::future<T> &future);
+ void execute(Worker<T> &&task, std::future<T> &future);
/**
* attempts to stop tasks with the provided identifier.
@@ -310,7 +308,6 @@ class ThreadPool {
int max_worker_threads_;
// current worker tasks.
std::atomic<int> current_workers_;
- std::atomic<int> task_count_;
// thread queue
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
@@ -340,6 +337,10 @@ class ThreadPool {
std::recursive_mutex manager_mutex_;
// thread pool name
std::string name_;
+ // count of running tasks by ID
+ std::unordered_map<TaskId, uint32_t> running_task_count_by_id_;
+ // variable to signal task running completion
+ std::condition_variable task_run_complete_;
/**
* Call for the manager to start worker threads
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index 8913963fb..7c5dae6bd 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -500,4 +500,18 @@ std::string StringUtils::escapeUnprintableBytes(gsl::span<const std::byte> data)
return result;
}
+bool StringUtils::matchesSequence(std::string_view str, const std::vector<std::string>& patterns) {
+ size_t pos = 0;
+
+ for (const auto& pattern : patterns) {
+ pos = str.find(pattern, pos);
+ if (pos == std::string_view::npos) {
+ return false;
+ }
+ pos += pattern.size();
+ }
+
+ return true;
+}
+
} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index a74313d2b..ade82017b 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -48,8 +48,20 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
worker_queue_.enqueue(std::move(task));
continue;
}
+ ++running_task_count_by_id_[task.getIdentifier()];
}
- if (task.run()) {
+ const bool taskRunResult = task.run();
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ auto& count = running_task_count_by_id_[task.getIdentifier()];
+ if (count == 1) {
+ running_task_count_by_id_.erase(task.getIdentifier());
+ } else {
+ --count;
+ }
+ }
+ task_run_complete_.notify_all();
+ if (taskRunResult) {
if (task.getNextExecutionTime() <= std::chrono::steady_clock::now()) {
// it can be rescheduled again as soon as there is a worker available
worker_queue_.enqueue(std::move(task));
@@ -101,17 +113,13 @@ void ThreadPool<T>::manage_delayed_queue() {
}
template<typename T>
-bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+void ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[task.getIdentifier()] = true;
}
future = std::move(task.getPromise()->get_future());
worker_queue_.enqueue(std::move(task));
-
- task_count_++;
-
- return true;
}
template<typename T>
@@ -200,6 +208,26 @@ template<typename T>
void ThreadPool<T>::stopTasks(const TaskId &identifier) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[identifier] = false;
+
+ // remove tasks belonging to identifier from worker_queue_
+ worker_queue_.remove([&] (const Worker<T>& worker) { return worker.getIdentifier() == identifier; });
+
+ // also remove from delayed_worker_queue_
+ decltype(delayed_worker_queue_) new_delayed_worker_queue;
+ while (!delayed_worker_queue_.empty()) {
+ Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+ delayed_worker_queue_.pop();
+ if (task.getIdentifier() != identifier) {
+ new_delayed_worker_queue.push(std::move(task));
+ }
+ }
+ delayed_worker_queue_ = std::move(new_delayed_worker_queue);
+
+ // if tasks are in progress, wait for their completion
+ task_run_complete_.wait(lock, [&] () {
+ auto iter = running_task_count_by_id_.find(identifier);
+ return iter == running_task_count_by_id_.end() || iter->second == 0;
+ });
}
template<typename T>
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 05fbea77e..e470206ac 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -93,7 +93,7 @@ TEST_CASE("BT2", "[TPT2]") {
utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
std::future<int> fut;
- REQUIRE(true == pool.execute(std::move(functor), fut));
+ pool.execute(std::move(functor), fut);
}
std::function<int()> f_ex = counterFunction;
@@ -101,7 +101,7 @@ TEST_CASE("BT2", "[TPT2]") {
utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
std::future<int> fut;
- REQUIRE(true == pool.execute(std::move(functor), fut));
+ pool.execute(std::move(functor), fut);
std::vector<BackTrace> traces = pool.getTraces();
for (const auto &trace : traces) {
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index 7e3d835a2..6653d1313 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -190,7 +190,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") {
std::function<bool()> f_ex = createSocket;
utils::Worker<bool> functor(f_ex, "id");
std::future<bool> fut;
- REQUIRE(true == pool.execute(std::move(functor), fut));
+ pool.execute(std::move(functor), fut);
futures.push_back(std::move(fut));
}
pool.start();
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 46c4b1573..fe4ea427b 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -532,3 +532,13 @@ TEST_CASE("StringUtils::escapeUnprintableBytes", "[escapeUnprintableBytes]") {
REQUIRE(StringUtils::escapeUnprintableBytes(from_cstring("ab\n\r\t\v\fde")) == "ab\\n\\r\\t\\v\\fde");
REQUIRE(StringUtils::escapeUnprintableBytes(from_cstring("ab\x00""c\x01""d")) == "ab\\x00c\\x01d");
}
+
+TEST_CASE("StringUtils::matchesSequence works correctly", "[matchesSequence]") {
+ REQUIRE(StringUtils::matchesSequence("abcdef", {"abc", "def"}));
+ REQUIRE(!StringUtils::matchesSequence("abcef", {"abc", "def"}));
+ REQUIRE(StringUtils::matchesSequence("xxxabcxxxdefxxx", {"abc", "def"}));
+ REQUIRE(!StringUtils::matchesSequence("defabc", {"abc", "def"}));
+ REQUIRE(StringUtils::matchesSequence("xxxabcxxxabcxxxdefxxx", {"abc", "def"}));
+ REQUIRE(StringUtils::matchesSequence("xxxabcxxxabcxxxdefxxx", {"abc", "abc", "def"}));
+ REQUIRE(!StringUtils::matchesSequence("xxxabcxxxdefxxx", {"abc", "abc", "def"}));
+}
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index bd2eab980..c4e613cad 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -72,7 +72,7 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") {
utils::Worker<bool> functor(f_ex, "id");
pool.start();
std::future<bool> fut;
- REQUIRE(true == pool.execute(std::move(functor), fut));
+ pool.execute(std::move(functor), fut);
fut.wait();
REQUIRE(true == fut.get());
}
@@ -91,7 +91,7 @@ TEST_CASE("ThreadPoolTest2", "[TPT2]") {
utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
pool.start();
std::future<int> fut;
- REQUIRE(true == pool.execute(std::move(functor), fut));
+ pool.execute(std::move(functor), fut);
fut.wait();
REQUIRE(20 == fut.get());
}
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 89b9a301a..dac38ca76 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -147,19 +147,15 @@ class Instance {
protected:
- bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t& /*delay*/) {
+ void registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t& /*delay*/) {
auto functions = updateController->getFunctions();
// run all functions independently
for (auto function : functions) {
utils::Worker<utils::TaskRescheduleInfo> functor(function, "listeners");
std::future<utils::TaskRescheduleInfo> future;
- if (!listener_thread_pool_.execute(std::move(functor), future)) {
- // denote failure
- return false;
- }
+ listener_thread_pool_.execute(std::move(functor), future);
}
- return true;
}
std::shared_ptr<c2::C2CallbackAgent> agent_;