You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2022/04/01 10:58:27 UTC
[nifi-minifi-cpp] 01/05: MINIFICPP-1779 Verify multiple C2 commands in HB response
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bc46482985b5202479f83f47dc98e274cf434b6d
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Fri Apr 1 12:51:22 2022 +0200
MINIFICPP-1779 Verify multiple C2 commands in HB response
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1285
---
.../http-curl/tests/C2MultipleCommandsTest.cpp | 130 +++++++++++++++++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/HTTPHandlers.h | 63 ++++++----
3 files changed, 174 insertions(+), 20 deletions(-)
diff --git a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
new file mode 100644
index 0000000..06697c3
--- /dev/null
+++ b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include <vector>
+#include <functional>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+
+class AckAuditor {
+ public:
+ void addAck(const std::string& ack) {
+ std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_);
+ acknowledged_operations_.insert(ack);
+ }
+
+ bool isAcknowledged(const std::string& operation_id) const {
+ std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_);
+ return acknowledged_operations_.count(operation_id) > 0;
+ }
+
+ void addVerifier(std::function<void(const rapidjson::Document&)> verifier) {
+ std::lock_guard<std::mutex> guard(verify_ack_mutex_);
+ ack_verifiers_.push_back(std::move(verifier));
+ }
+
+ void verifyAck(const rapidjson::Document& root) {
+ std::lock_guard<std::mutex> guard(verify_ack_mutex_);
+ if (ack_verifiers_.empty()) {
+ assert(false);
+ }
+
+ ack_verifiers_[next_verifier_index_](root);
+ ++next_verifier_index_;
+ if (next_verifier_index_ >= ack_verifiers_.size()) {
+ next_verifier_index_ = 0;
+ }
+ }
+
+ private:
+ mutable std::mutex acknowledged_operations_mutex_;
+ mutable std::mutex verify_ack_mutex_;
+ std::unordered_set<std::string> acknowledged_operations_;
+ std::vector<std::function<void(const rapidjson::Document&)>> ack_verifiers_;
+ uint32_t next_verifier_index_ = 0;
+};
+
+class MultipleC2CommandHandler: public HeartbeatHandler {
+ public:
+ explicit MultipleC2CommandHandler(AckAuditor& ack_auditor, std::shared_ptr<minifi::Configure> configuration)
+ : HeartbeatHandler(std::move(configuration)),
+ ack_auditor_(ack_auditor) {
+ }
+
+ void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
+ std::vector<C2Operation> operations{{"DESCRIBE", "manifest", "889345", {}}, {"DESCRIBE", "corecomponentstate", "889346", {}}};
+ ack_auditor_.addVerifier([this](const rapidjson::Document& root) {
+ verifyJsonHasAgentManifest(root);
+ });
+ ack_auditor_.addVerifier([](const rapidjson::Document& root) {
+ assert(root.HasMember("corecomponentstate"));
+ });
+ sendHeartbeatResponse(operations, conn);
+ }
+
+ void handleAcknowledge(const rapidjson::Document& root) override {
+ ack_auditor_.verifyAck(root);
+ if (root.IsObject() && root.HasMember("operationId")) {
+ ack_auditor_.addAck(root["operationId"].GetString());
+ }
+ }
+
+ private:
+ AckAuditor& ack_auditor_;
+};
+
+class VerifyC2MultipleCommands : public VerifyC2Base {
+ public:
+ explicit VerifyC2MultipleCommands(AckAuditor& auditor)
+ : ack_auditor_(auditor) {
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ VerifyC2Base::testSetup();
+ }
+
+ void configureFullHeartbeat() override {
+ configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_full_heartbeat, "false");
+ }
+
+ void runAssertions() override {
+ assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889345");}));
+ assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889346");}));
+ }
+
+ private:
+ AckAuditor& ack_auditor_;
+};
+
+int main(int argc, char **argv) {
+ const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
+ AckAuditor ack_auditor;
+ VerifyC2MultipleCommands harness(ack_auditor);
+ harness.setKeyDir(args.key_dir);
+ MultipleC2CommandHandler responder(ack_auditor, harness.getConfiguration());
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 0077008..975b6e9 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -101,3 +101,4 @@ add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index c5b6c2e..a0acc6b 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -404,31 +404,54 @@ class HeartbeatHandler : public ServerAwareHandler {
}
protected:
- void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn,
+ struct C2Operation {
+ std::string operation;
+ std::string operand;
+ std::string operation_id;
+ std::unordered_map<std::string, std::string> args;
+ };
+
+ void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn,
const std::unordered_map<std::string, std::string>& args = {}) {
- std::string resp_args;
- if (!args.empty()) {
- resp_args = ", \"args\": {";
- auto it = args.begin();
- while (it != args.end()) {
- resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
- ++it;
- if (it != args.end()) {
- resp_args += ", ";
+ sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn);
+ }
+
+ void sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn) {
+ std::string operation_jsons;
+ for (const auto& c2_operation : operations) {
+ std::string resp_args;
+ if (!c2_operation.args.empty()) {
+ resp_args = ", \"args\": {";
+ auto it = c2_operation.args.begin();
+ while (it != c2_operation.args.end()) {
+ resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
+ ++it;
+ if (it != c2_operation.args.end()) {
+ resp_args += ", ";
+ }
}
+ resp_args += "}";
+ }
+
+ std::string operation_json = "{"
+ "\"operation\" : \"" + c2_operation.operation + "\","
+ "\"operationid\" : \"" + c2_operation.operation_id + "\","
+ "\"operand\": \"" + c2_operation.operand + "\"" +
+ resp_args + "}";
+
+ if (operation_jsons.empty()) {
+ operation_jsons += operation_json;
+ } else {
+ operation_jsons += ", " + operation_json;
}
- resp_args += "}";
}
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"" + operation + "\","
- "\"operationid\" : \"" + operationId + "\","
- "\"operand\": \"" + operand + "\"" +
- resp_args + "}]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- heartbeat_response.length());
- mg_printf(conn, "%s", heartbeat_response.c_str());
+ std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}";
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ heartbeat_response.length());
+ mg_printf(conn, "%s", heartbeat_response.c_str());
}
void verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components = {}, const std::vector<std::string>& disallowed_properties = {}) {