You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/18 14:23:18 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1711 Stabilize C2 tests under load

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6b46fd397389845f54fe00fff208fc19934ad567
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 18 15:49:14 2022 +0200

    MINIFICPP-1711 Stabilize C2 tests under load
    
    Closes #1318
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../tests/C2ClearCoreComponentStateTest.cpp        | 51 ++++++++++-----
 .../http-curl/tests/C2MultipleCommandsTest.cpp     |  4 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   | 10 ++-
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    | 10 +--
 extensions/http-curl/tests/C2RequestClassTest.cpp  |  2 +-
 .../http-curl/tests/C2VerifyHeartbeatAndStop.cpp   |  2 +-
 .../tests/C2VerifyLightweightHeartbeatAndStop.cpp  | 11 +++-
 extensions/http-curl/tests/HTTPHandlers.h          | 73 +++++++++++++---------
 libminifi/test/integration/IntegrationBase.h       |  2 +-
 9 files changed, 102 insertions(+), 63 deletions(-)

diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
index fa9fadfc9..c39d80b3e 100644
--- a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -48,7 +48,7 @@ class VerifyC2ClearCoreComponentState : public VerifyC2Base {
 
   void runAssertions() override {
     using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-    assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }));
+    assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); }, 1s));
   }
 
   [[nodiscard]] std::string getFile1Location() const {
@@ -93,21 +93,14 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
         sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
         flow_state_ = FlowState::FIRST_DESCRIBE_SENT;
         break;
-      case FlowState::FIRST_DESCRIBE_SENT: {
+      case FlowState::FIRST_DESCRIBE_ACK:
+      case FlowState::CLEAR_SENT: {
         sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { {"corecomponent1", "TailFile1"} });
         flow_state_ = FlowState::CLEAR_SENT;
         break;
       }
-      case FlowState::CLEAR_SENT: {
-        using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-        auto tail_file_ran_again_checker = [this] {
-          const auto log_contents = LogTestController::getInstance().log_output.str();
-          const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
-          const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
-          const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
-          return utils::StringUtils::matchesSequence(log_contents, patterns);
-        };
-        assert(verifyEventHappenedInPollTime(10s, tail_file_ran_again_checker));
+      case FlowState::CLEAR_SENT_ACK:
+      case FlowState::SECOND_DESCRIBE_SENT: {
         sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", conn);
         flow_state_ = FlowState::SECOND_DESCRIBE_SENT;
         break;
@@ -137,24 +130,46 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
 
         last_read_time_1_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString());
         last_read_time_2_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString());
+        assert(!last_read_time_1_.empty());
+        assert(!last_read_time_2_.empty());
+        flow_state_ = FlowState::FIRST_DESCRIBE_ACK;
         break;
       }
-      case FlowState::CLEAR_SENT:
+      case FlowState::CLEAR_SENT: {
+        auto tail_file_ran_again_checker = [this] {
+          const auto log_contents = LogTestController::getInstance().log_output.str();
+          const std::string tailing_file_pattern = "[debug] Tailing file " + file_1_location_;
+          const std::string tail_file_committed_pattern = "[trace] ProcessSession committed for TailFile1";
+          const std::vector<std::string> patterns = {tailing_file_pattern, tailing_file_pattern, tail_file_committed_pattern};
+          return utils::StringUtils::matchesSequence(log_contents, patterns);
+        };
+        if (tail_file_ran_again_checker()) {
+          flow_state_ = FlowState::CLEAR_SENT_ACK;
+        }
         break;
+      }
       case FlowState::SECOND_DESCRIBE_SENT: {
+        if (!root.HasMember("corecomponentstate") ||
+            !root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") ||
+            !root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994")) {
+          break;
+        }
+
+        auto file2_state_time = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString());
+        auto file1_state_time = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString());
         const bool clearedStateFound =
             root.HasMember("corecomponentstate") &&
             root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") &&
             root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") &&
-            std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()) == last_read_time_2_ &&
-            std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()) != last_read_time_1_;
+            file2_state_time == last_read_time_2_ &&
+            file1_state_time != last_read_time_1_;
+
         if (clearedStateFound) {
           component_cleared_successfully_ = clearedStateFound;
         }
         break;
       }
-      default:
-        throw std::runtime_error("Invalid flow state state when handling acknowledge message!");
+      default: {}
     }
   }
 
@@ -162,7 +177,9 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler {
   enum class FlowState {
     STARTED,
     FIRST_DESCRIBE_SENT,
+    FIRST_DESCRIBE_ACK,
     CLEAR_SENT,
+    CLEAR_SENT_ACK,
     SECOND_DESCRIBE_SENT
   };
 
diff --git a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
index 06697c3d2..beb6e53f4 100644
--- a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
+++ b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
@@ -111,8 +111,8 @@ class VerifyC2MultipleCommands : public VerifyC2Base {
   }
 
   void runAssertions() override {
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889345");}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889346");}));
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_auditor_.isAcknowledged("889345");}));
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_auditor_.isAcknowledged("889346");}));
   }
 
  private:
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index bfb69fd18..0721bd38e 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -34,7 +34,13 @@
 
 class VerifyC2PauseResume : public VerifyC2Base {
  public:
-  explicit VerifyC2PauseResume(const std::atomic_bool& flow_resumed_successfully) : VerifyC2Base(), flow_resumed_successfully_(flow_resumed_successfully) {}
+  explicit VerifyC2PauseResume(const std::atomic_bool& flow_resumed_successfully) : VerifyC2Base(), flow_resumed_successfully_(flow_resumed_successfully) {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessContext>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+  }
 
   void configureC2() override {
     VerifyC2Base::configureC2();
@@ -75,7 +81,7 @@ class PauseResumeHandler: public HeartbeatHandler {
       pause_start_time_ = std::chrono::system_clock::now();
       flow_state_ = FlowState::PAUSED;
       operation = "pause";
-    } else if (get_invoke_count_ == INITIAL_GET_INVOKE_COUNT && flow_state_ == FlowState::STARTED) {
+    } else if (get_invoke_count_ >= INITIAL_GET_INVOKE_COUNT && flow_state_ == FlowState::STARTED) {
       flow_state_ = FlowState::PAUSE_INITIATED;
       operation = "pause";
     } else if (flow_state_ == FlowState::PAUSED) {
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 870604bcc..761475fb8 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -162,13 +162,13 @@ int main() {
   // On msvc, the passed lambda can't capture a reference to the object under construction, so we need to late-init harness.
   VerifyPropertyUpdate harness;
   harness = VerifyPropertyUpdate([&] {
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_handler.isAcknowledged("79");}));
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {
       return ack_handler.getApplyCount("FULLY_APPLIED") == 1;
     }));
 
     // Updating the same property will result in a no operation response
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {
       return ack_handler.getApplyCount("NO_OPERATION") > 0;
     }));
 
@@ -176,12 +176,12 @@ int main() {
     hb_handler.setProperties({{minifi::Configuration::nifi_c2_rest_heartbeat_minimize_updates, "banana", true}, {minifi::Configuration::minifi_disk_space_watchdog_enable, "true", true}});
 
     // Due to 1 invalid value the result will be partially applied
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {
       return ack_handler.getApplyCount("PARTIALLY_APPLIED") == 1;
     }));
 
     // Repeating the previous update request results in 1 no operation and 1 failure which results in not applied response
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+    assert(utils::verifyEventHappenedInPollTime(10s, [&] {
       return ack_handler.getApplyCount("NOT_APPLIED") > 0
         && harness.getRestartRequestedCount() == 2;
     }));
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp b/extensions/http-curl/tests/C2RequestClassTest.cpp
index ed97fa032..8e87d3768 100644
--- a/extensions/http-curl/tests/C2RequestClassTest.cpp
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -83,7 +83,7 @@ class VerifyC2ClassRequest : public VerifyC2Base {
   }
 
   void runAssertions() override {
-    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(3), verify_));
+    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
   }
 
  private:
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 64e071a6f..deaf1438d 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -55,7 +55,7 @@ int main(int argc, char **argv) {
   const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
   VerifyC2Heartbeat harness;
   harness.setKeyDir(args.key_dir);
-  HeartbeatHandler responder(harness.getConfiguration());
+  StoppingHeartbeatHandler responder(harness.getConfiguration());
   harness.setUrl(args.url, &responder);
   harness.run(args.test_file);
 
diff --git a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
index 4b6c313da..f0060538a 100644
--- a/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyLightweightHeartbeatAndStop.cpp
@@ -17,6 +17,8 @@
  */
 
 #undef NDEBUG
+#include <mutex>
+
 #include "TestBase.h"
 #include "Catch.h"
 #include "c2/C2Agent.h"
@@ -28,13 +30,14 @@
 #include "utils/IntegrationTestUtils.h"
 #include "properties/Configuration.h"
 
-class LightWeightC2Handler : public HeartbeatHandler {
+class LightWeightC2Handler : public StoppingHeartbeatHandler {
  public:
   explicit LightWeightC2Handler(std::shared_ptr<minifi::Configure> configuration)
-    : HeartbeatHandler(std::move(configuration)) {
+    : StoppingHeartbeatHandler(std::move(configuration)) {
   }
 
   void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) override {
+    std::lock_guard<std::mutex> lock(call_mutex_);
     if (calls_ == 0) {
       verifyJsonHasAgentManifest(root);
     } else {
@@ -43,8 +46,10 @@ class LightWeightC2Handler : public HeartbeatHandler {
     }
     calls_++;
   }
+
  private:
-  std::atomic<size_t> calls_{0};
+  std::mutex call_mutex_;
+  std::size_t calls_{0};
 };
 
 class VerifyLightWeightC2Heartbeat : public VerifyC2Base {
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index cdba06fbe..0174eb197 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -399,7 +399,6 @@ class HeartbeatHandler : public ServerAwareHandler {
 
   bool handlePost(CivetServer *, struct mg_connection *conn) override {
     verify(conn);
-    sendStopOperation(conn);
     return true;
   }
 
@@ -490,18 +489,31 @@ class HeartbeatHandler : public ServerAwareHandler {
     verifySupportedOperations(root, verify_components, disallowed_properties);
   }
 
+  void verify(struct mg_connection *conn) {
+    auto post_data = readPayload(conn);
+    if (!isServerRunning()) {
+      return;
+    }
+    if (!IsNullOrEmpty(post_data)) {
+      rapidjson::Document root;
+      rapidjson::ParseResult result = root.Parse(post_data.data(), post_data.size());
+      if (!result) {
+        throw std::runtime_error(fmt::format("JSON parse error: {0}\n JSON data: {1}", std::string(rapidjson::GetParseError_En(result.Code())), post_data));
+      }
+      std::string operation = root["operation"].GetString();
+      if (operation == "heartbeat") {
+        handleHeartbeat(root, conn);
+      } else if (operation == "acknowledge") {
+        handleAcknowledge(root);
+      } else {
+        throw std::runtime_error("operation not supported " + operation);
+      }
+    }
+  }
+
  private:
   using Metadata = std::unordered_map<std::string, std::vector<std::unordered_map<std::string, std::string>>>;
 
-  static void sendStopOperation(struct mg_connection *conn) {
-    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\"  } ]}";
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-              resp.length());
-    mg_printf(conn, "%s", resp.c_str());
-  }
-
   static std::set<std::string> getOperandsOfProperties(const rapidjson::Value& operation_node) {
     std::set<std::string> operands;
     assert(operation_node.HasMember("properties"));
@@ -613,29 +625,28 @@ class HeartbeatHandler : public ServerAwareHandler {
     assert(operations == minifi::c2::Operation::values());
   }
 
-  void verify(struct mg_connection *conn) {
-    auto post_data = readPayload(conn);
-    if (!isServerRunning()) {
-      return;
-    }
-    if (!IsNullOrEmpty(post_data)) {
-      rapidjson::Document root;
-      rapidjson::ParseResult result = root.Parse(post_data.data(), post_data.size());
-      if (!result) {
-        throw std::runtime_error(fmt::format("JSON parse error: {0}\n JSON data: {1}", std::string(rapidjson::GetParseError_En(result.Code())), post_data));
-      }
-      std::string operation = root["operation"].GetString();
-      if (operation == "heartbeat") {
-        handleHeartbeat(root, conn);
-      } else if (operation == "acknowledge") {
-        handleAcknowledge(root);
-      } else {
-        throw std::runtime_error("operation not supported " + operation);
-      }
-    }
+  std::shared_ptr<minifi::Configure> configuration_;
+};
+
+class StoppingHeartbeatHandler : public HeartbeatHandler {
+ public:
+  explicit StoppingHeartbeatHandler(std::shared_ptr<minifi::Configure> configuration) : HeartbeatHandler(std::move(configuration)) {}
+
+  bool handlePost(CivetServer *, struct mg_connection *conn) override {
+    verify(conn);
+    sendStopOperation(conn);
+    return true;
   }
 
-  std::shared_ptr<minifi::Configure> configuration_;
+ private:
+  static void sendStopOperation(struct mg_connection *conn) {
+    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\"  } ]}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              resp.length());
+    mg_printf(conn, "%s", resp.c_str());
+  }
 };
 
 class C2FlowProvider : public ServerAwareHandler {
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 1c43656db..3c58d67cc 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -17,7 +17,7 @@
  */
 #pragma once
 
-#define DEFAULT_WAITTIME_MSECS 3000
+#define DEFAULT_WAITTIME_MSECS 10000
 
 #include <future>
 #include <memory>