You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/18 14:23:18 UTC
[nifi-minifi-cpp] 02/02: MINIFICPP-1711 Stabilize C2 tests under load
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6b46fd397389845f54fe00fff208fc19934ad567
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 18 15:49:14 2022 +0200
MINIFICPP-1711 Stabilize C2 tests under load
Closes #1318
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../tests/C2ClearCoreComponentStateTest.cpp | 51 ++++++++++-----
.../http-curl/tests/C2MultipleCommandsTest.cpp | 4 +-
extensions/http-curl/tests/C2PauseResumeTest.cpp | 10 ++-
.../http-curl/tests/C2PropertiesUpdateTests.cpp | 10 +--
extensions/http-curl/tests/C2RequestClassTest.cpp | 2 +-
.../http-curl/tests/C2VerifyHeartbeatAndStop.cpp | 2 +-
.../tests/C2VerifyLightweightHeartbeatAndStop.cpp | 11 +++-
extensions/http-curl/tests/HTTPHandlers.h | 73 +++++++++++++---------
libminifi/test/integration/IntegrationBase.h | 2 +-
9 files changed, 102 insertions(+), 63 deletions(-)
diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index fa9fadfc9..c39d80b3e 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -48,7 +48,7 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
void runAssertions() override {
using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
- assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }));
+ assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }, 1s));
}
[[nodiscard]] std::string getFile1Location() const {
@@ -93,21 +93,14 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
flow_state_ = FlowState::FIRST_DESCRIBE_SENT;
break;
- case FlowState::FIRST_DESCRIBE_SENT: {
+ case FlowState::FIRST_DESCRIBE_ACK:
+ case FlowState::CLEAR_SENT: {
sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { {"corecomponent1", "TailFile1"} });
flow_state_ = FlowState::CLEAR_SENT;
break;
}
- case FlowState::CLEAR_SENT: {
- using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
- auto tail_file_ran_again_checker = [this] {
- const auto log_contents = LogTestController::getInstance().log_output.str();
- const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
- const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
- const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
- return utils::StringUtils::matchesSequence(log_contents, patterns);
- };
- assert(verifyEventHappenedInPollTime(10s, tail_file_ran_again_checker));
+ case FlowState::CLEAR_SENT_ACK:
+ case FlowState::SECOND_DESCRIBE_SENT: {
sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", conn);
flow_state_ = FlowState::SECOND_DESCRIBE_SENT;
break;
@@ -137,24 +130,46 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
last_read_time_1_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString());
last_read_time_2_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString());
+ assert(!last_read_time_1_.empty());
+ assert(!last_read_time_2_.empty());
+ flow_state_ = FlowState::FIRST_DESCRIBE_ACK;
break;
}
- case FlowState::CLEAR_SENT:
+ case FlowState::CLEAR_SENT: {
+ auto tail_file_ran_again_checker = [this] {
+ const auto log_contents = LogTestController::getInstance().log_output.str();
+ const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
+ const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
+ const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
+ return utils::StringUtils::matchesSequence(log_contents, patterns);
+ };
+ if (tail_file_ran_again_checker()) {
+ flow_state_ = FlowState::CLEAR_SENT_ACK;
+ }
break;
+ }
case FlowState::SECOND_DESCRIBE_SENT: {
+ if (!root.HasMember("corecomponentstate") ||
+ !root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") ||
+ !root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994")) {
+ break;
+ }
+
+ auto file2_state_time = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString());
+ auto file1_state_time = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString());
const bool clearedStateFound =
root.HasMember("corecomponentstate") &&
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") &&
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") &&
- std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()) == last_read_time_2_ &&
- std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()) != last_read_time_1_;
+ file2_state_time == last_read_time_2_ &&
+ file1_state_time != last_read_time_1_;
+
if (clearedStateFound) {
component_cleared_successfully_ = clearedStateFound;
}
break;
}
- default:
- throw std::runtime_error("Invalid flow state state when handling acknowledge message!");
+ default: {}
}
}
@@ -162,7 +177,9 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
enum class FlowState {
STARTED,
FIRST_DESCRIBE_SENT,
+ FIRST_DESCRIBE_ACK,
CLEAR_SENT,
+ CLEAR_SENT_ACK,
SECOND_DESCRIBE_SENT
};
diff --git a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
index 06697c3d2..beb6e53f4 100644
--- a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
+++ b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
@@ -111,8 +111,8 @@ class VerifyC2MultipleCommands : public VerifyC2Base {
}
void runAssertions() override {
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889345");}));
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889346");}));
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_auditor_.isAcknowledged("889345");}));
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_auditor_.isAcknowledged("889346");}));
}
private:
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index bfb69fd18..0721bd38e 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -34,7 +34,13 @@
class VerifyC2PauseResume : public VerifyC2Base {
public:
- explicit VerifyC2PauseResume(const std::atomic_bool& flow_resumed_successfully) : VerifyC2Base(), flow_resumed_successfully_(flow_resumed_successfully) {}
+ explicit VerifyC2PauseResume(const std::atomic_bool& flow_resumed_successfully) : VerifyC2Base(), flow_resumed_successfully_(flow_resumed_successfully) {
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessContext>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ }
void configureC2() override {
VerifyC2Base::configureC2();
@@ -75,7 +81,7 @@ class PauseResumeHandler: public HeartbeatHandler {
pause_start_time_ = std::chrono::system_clock::now();
flow_state_ = FlowState::PAUSED;
operation = "pause";
- } else if (get_invoke_count_ == INITIAL_GET_INVOKE_COUNT && flow_state_ == FlowState::STARTED) {
+ } else if (get_invoke_count_ >= INITIAL_GET_INVOKE_COUNT && flow_state_ == FlowState::STARTED) {
flow_state_ = FlowState::PAUSE_INITIATED;
operation = "pause";
} else if (flow_state_ == FlowState::PAUSED) {
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 870604bcc..761475fb8 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -162,13 +162,13 @@ int main() {
// On msvc, the passed lambda can't capture a reference to the object under construction, so we need to late-init harness.
VerifyPropertyUpdate harness;
harness = VerifyPropertyUpdate([&] {
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");}));
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_handler.isAcknowledged("79");}));
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("FULLY_APPLIED") == 1;
}));
// Updating the same property will result in a no operation response
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("NO_OPERATION") > 0;
}));
@@ -176,12 +176,12 @@ int main() {
hb_handler.setProperties({{minifi::Configuration::nifi_c2_rest_heartbeat_minimize_updates, "banana", true}, {minifi::Configuration::minifi_disk_space_watchdog_enable, "true", true}});
// Due to 1 invalid value the result will be partially applied
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("PARTIALLY_APPLIED") == 1;
}));
// Repeating the previous update request results in 1 no operation and 1 failure which results in not applied response
- assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+ assert(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("NOT_APPLIED") > 0
&& harness.getRestartRequestedCount() == 2;
}));
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp b/extensions/http-curl/tests/C2RequestClassTest.cpp
index ed97fa032..8e87d3768 100644
--- a/extensions/http-curl/tests/C2RequestClassTest.cpp
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -83,7 +83,7 @@ class VerifyC2ClassRequest : public VerifyC2Base {
}
void runAssertions() override {
- assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(3), verify_));
+ assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
}
private:
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 64e071a6f..deaf1438d 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -55,7 +55,7 @@ int main(int argc, char **argv) {
const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
VerifyC2Heartbeat harness;
harness.setKeyDir(args.key_dir);
- HeartbeatHandler responder(harness.getConfiguration());
+ StoppingHeartbeatHandler responder(harness.getConfiguration());
harness.setUrl(args.url, &responder);
harness.run(args.test_file);
diff --git a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
index 4b6c313da..f0060538a 100644
--- a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
@@ -17,6 +17,8 @@
*/
#undef NDEBUG
+#include <mutex>
+
#include "TestBase.h"
#include "Catch.h"
#include "c2/C2Agent.h"
@@ -28,13 +30,14 @@
#include "utils/IntegrationTestUtils.h"
#include "properties/Configuration.h"
-class LightWeightC2Handler : public HeartbeatHandler {
+class LightWeightC2Handler : public StoppingHeartbeatHandler {
public:
explicit LightWeightC2Handler(std::shared_ptr<minifi::Configure> configuration)
- : HeartbeatHandler(std::move(configuration)) {
+ : StoppingHeartbeatHandler(std::move(configuration)) {
}
void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) override {
+ std::lock_guard<std::mutex> lock(call_mutex_);
if (calls_ == 0) {
verifyJsonHasAgentManifest(root);
} else {
@@ -43,8 +46,10 @@ class LightWeightC2Handler : public HeartbeatHandler {
}
calls_++;
}
+
private:
- std::atomic<size_t> calls_{0};
+ std::mutex call_mutex_;
+ std::size_t calls_{0};
};
class VerifyLightWeightC2Heartbeat : public VerifyC2Base {
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index cdba06fbe..0174eb197 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -399,7 +399,6 @@ class HeartbeatHandler : public ServerAwareHandler {
bool handlePost(CivetServer *, struct mg_connection *conn) override {
verify(conn);
- sendStopOperation(conn);
return true;
}
@@ -490,18 +489,31 @@ class HeartbeatHandler : public ServerAwareHandler {
verifySupportedOperations(root, verify_components, disallowed_properties);
}
+ void verify(struct mg_connection *conn) {
+ auto post_data = readPayload(conn);
+ if (!isServerRunning()) {
+ return;
+ }
+ if (!IsNullOrEmpty(post_data)) {
+ rapidjson::Document root;
+ rapidjson::ParseResult result = root.Parse(post_data.data(), post_data.size());
+ if (!result) {
+ throw std::runtime_error(fmt::format("JSON parse error: {0}\n JSON data: {1}", std::string(rapidjson::GetParseError_En(result.Code())), post_data));
+ }
+ std::string operation = root["operation"].GetString();
+ if (operation == "heartbeat") {
+ handleHeartbeat(root, conn);
+ } else if (operation == "acknowledge") {
+ handleAcknowledge(root);
+ } else {
+ throw std::runtime_error("operation not supported " + operation);
+ }
+ }
+ }
+
private:
using Metadata = std::unordered_map<std::string, std::vector<std::unordered_map<std::string, std::string>>>;
- static void sendStopOperation(struct mg_connection *conn) {
- std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\" }, "
- "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\" } ]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- resp.length());
- mg_printf(conn, "%s", resp.c_str());
- }
-
static std::set<std::string> getOperandsOfProperties(const rapidjson::Value& operation_node) {
std::set<std::string> operands;
assert(operation_node.HasMember("properties"));
@@ -613,29 +625,28 @@ class HeartbeatHandler : public ServerAwareHandler {
assert(operations == minifi::c2::Operation::values());
}
- void verify(struct mg_connection *conn) {
- auto post_data = readPayload(conn);
- if (!isServerRunning()) {
- return;
- }
- if (!IsNullOrEmpty(post_data)) {
- rapidjson::Document root;
- rapidjson::ParseResult result = root.Parse(post_data.data(), post_data.size());
- if (!result) {
- throw std::runtime_error(fmt::format("JSON parse error: {0}\n JSON data: {1}", std::string(rapidjson::GetParseError_En(result.Code())), post_data));
- }
- std::string operation = root["operation"].GetString();
- if (operation == "heartbeat") {
- handleHeartbeat(root, conn);
- } else if (operation == "acknowledge") {
- handleAcknowledge(root);
- } else {
- throw std::runtime_error("operation not supported " + operation);
- }
- }
+ std::shared_ptr<minifi::Configure> configuration_;
+};
+
+class StoppingHeartbeatHandler : public HeartbeatHandler {
+ public:
+ explicit StoppingHeartbeatHandler(std::shared_ptr<minifi::Configure> configuration) : HeartbeatHandler(std::move(configuration)) {}
+
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
+ verify(conn);
+ sendStopOperation(conn);
+ return true;
}
- std::shared_ptr<minifi::Configure> configuration_;
+ private:
+ static void sendStopOperation(struct mg_connection *conn) {
+ std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\" }, "
+ "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\" } ]}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ resp.length());
+ mg_printf(conn, "%s", resp.c_str());
+ }
};
class C2FlowProvider : public ServerAwareHandler {
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 1c43656db..3c58d67cc 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -17,7 +17,7 @@
*/
#pragma once
-#define DEFAULT_WAITTIME_MSECS 3000
+#define DEFAULT_WAITTIME_MSECS 10000
#include <future>
#include <memory>