You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/04/01 10:58:27 UTC

[nifi-minifi-cpp] 01/05: MINIFICPP-1779 Verify multiple C2 commands in HB response

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit bc46482985b5202479f83f47dc98e274cf434b6d
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Fri Apr 1 12:51:22 2022 +0200

    MINIFICPP-1779 Verify multiple C2 commands in HB response
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1285
---
 .../http-curl/tests/C2MultipleCommandsTest.cpp     | 130 +++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          |  63 ++++++----
 3 files changed, 174 insertions(+), 20 deletions(-)

diff --git a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
new file mode 100644
index 0000000..06697c3
--- /dev/null
+++ b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include <vector>
+#include <functional>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+
+class AckAuditor {
+ public:
+  void addAck(const std::string& ack) {
+    std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_);
+    acknowledged_operations_.insert(ack);
+  }
+
+  bool isAcknowledged(const std::string& operation_id) const {
+    std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_);
+    return acknowledged_operations_.count(operation_id) > 0;
+  }
+
+  void addVerifier(std::function<void(const rapidjson::Document&)> verifier) {
+    std::lock_guard<std::mutex> guard(verify_ack_mutex_);
+    ack_verifiers_.push_back(std::move(verifier));
+  }
+
+  void verifyAck(const rapidjson::Document& root) {
+    std::lock_guard<std::mutex> guard(verify_ack_mutex_);
+    if (ack_verifiers_.empty()) {
+      assert(false);
+    }
+
+    ack_verifiers_[next_verifier_index_](root);
+    ++next_verifier_index_;
+    if (next_verifier_index_ >= ack_verifiers_.size()) {
+      next_verifier_index_ = 0;
+    }
+  }
+
+ private:
+  mutable std::mutex acknowledged_operations_mutex_;
+  mutable std::mutex verify_ack_mutex_;
+  std::unordered_set<std::string> acknowledged_operations_;
+  std::vector<std::function<void(const rapidjson::Document&)>> ack_verifiers_;
+  uint32_t next_verifier_index_ = 0;
+};
+
+class MultipleC2CommandHandler: public HeartbeatHandler {
+ public:
+  explicit MultipleC2CommandHandler(AckAuditor& ack_auditor, std::shared_ptr<minifi::Configure> configuration)
+    : HeartbeatHandler(std::move(configuration)),
+      ack_auditor_(ack_auditor) {
+  }
+
+  void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
+    std::vector<C2Operation> operations{{"DESCRIBE", "manifest", "889345", {}}, {"DESCRIBE", "corecomponentstate", "889346", {}}};
+    ack_auditor_.addVerifier([this](const rapidjson::Document& root) {
+      verifyJsonHasAgentManifest(root);
+    });
+    ack_auditor_.addVerifier([](const rapidjson::Document& root) {
+      assert(root.HasMember("corecomponentstate"));
+    });
+    sendHeartbeatResponse(operations, conn);
+  }
+
+  void handleAcknowledge(const rapidjson::Document& root) override {
+    ack_auditor_.verifyAck(root);
+    if (root.IsObject() && root.HasMember("operationId")) {
+      ack_auditor_.addAck(root["operationId"].GetString());
+    }
+  }
+
+ private:
+  AckAuditor& ack_auditor_;
+};
+
+class VerifyC2MultipleCommands : public VerifyC2Base {
+ public:
+  explicit VerifyC2MultipleCommands(AckAuditor& auditor)
+    : ack_auditor_(auditor) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    VerifyC2Base::testSetup();
+  }
+
+  void configureFullHeartbeat() override {
+    configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_full_heartbeat, "false");
+  }
+
+  void runAssertions() override {
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889345");}));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889346");}));
+  }
+
+ private:
+  AckAuditor& ack_auditor_;
+};
+
+int main(int argc, char **argv) {
+  const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
+  AckAuditor ack_auditor;
+  VerifyC2MultipleCommands harness(ack_auditor);
+  harness.setKeyDir(args.key_dir);
+  MultipleC2CommandHandler responder(ack_auditor, harness.getConfiguration());
+  harness.setUrl(args.url, &responder);
+  harness.run(args.test_file);
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 0077008..975b6e9 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -101,3 +101,4 @@ add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
 add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
 add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
 add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index c5b6c2e..a0acc6b 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -404,31 +404,54 @@ class HeartbeatHandler : public ServerAwareHandler {
   }
 
  protected:
-  void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn,
+  struct C2Operation {
+    std::string operation;
+    std::string operand;
+    std::string operation_id;
+    std::unordered_map<std::string, std::string> args;
+  };
+
+  void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn,
       const std::unordered_map<std::string, std::string>& args = {}) {
-    std::string resp_args;
-    if (!args.empty()) {
-      resp_args = ", \"args\": {";
-      auto it = args.begin();
-      while (it != args.end()) {
-        resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
-        ++it;
-        if (it != args.end()) {
-          resp_args += ", ";
+    sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn);
+  }
+
+  void sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn) {
+    std::string operation_jsons;
+    for (const auto& c2_operation : operations) {
+      std::string resp_args;
+      if (!c2_operation.args.empty()) {
+        resp_args = ", \"args\": {";
+        auto it = c2_operation.args.begin();
+        while (it != c2_operation.args.end()) {
+          resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
+          ++it;
+          if (it != c2_operation.args.end()) {
+            resp_args += ", ";
+          }
         }
+        resp_args += "}";
+      }
+
+      std::string operation_json = "{"
+        "\"operation\" : \"" + c2_operation.operation + "\","
+        "\"operationid\" : \"" + c2_operation.operation_id + "\","
+        "\"operand\": \"" + c2_operation.operand + "\"" +
+        resp_args + "}";
+
+      if (operation_jsons.empty()) {
+        operation_jsons += operation_json;
+      } else {
+        operation_jsons += ", " + operation_json;
       }
-      resp_args += "}";
     }
-    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
-          "\"operation\" : \"" + operation + "\","
-          "\"operationid\" : \"" + operationId + "\","
-          "\"operand\": \"" + operand + "\"" +
-          resp_args + "}]}";
 
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                heartbeat_response.length());
-      mg_printf(conn, "%s", heartbeat_response.c_str());
+    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}";
+
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              heartbeat_response.length());
+    mg_printf(conn, "%s", heartbeat_response.c_str());
   }
 
   void verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components = {}, const std::vector<std::string>& disallowed_properties = {}) {