You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/23 07:54:54 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1831 Asset download through c2
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e31a9958a MINIFICPP-1831 Asset download through c2
e31a9958a is described below
commit e31a9958a64fdbc016f059793095bed9fc3f7cda
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri May 20 17:46:11 2022 +0200
MINIFICPP-1831 Asset download through c2
Closes #1332
Signed-off-by: Marton Szasz <sz...@apache.org>
---
extensions/http-curl/protocols/RESTSender.cpp | 2 +-
extensions/http-curl/tests/C2UpdateAssetTest.cpp | 260 +++++++++++++++++++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/HTTPHandlers.h | 39 +++-
libminifi/include/c2/C2Agent.h | 1 +
libminifi/include/c2/C2Payload.h | 11 +-
libminifi/include/properties/Configuration.h | 2 +
libminifi/src/Configuration.cpp | 3 +-
libminifi/src/c2/C2Agent.cpp | 111 ++++++++++
9 files changed, 418 insertions(+), 12 deletions(-)
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index c94e1c0a4..7f593684d 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -160,7 +160,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
}
const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>();
logger_->log_trace("Received response: \"%s\"", [&] {return utils::StringUtils::escapeUnprintableBytes(response_body_bytes);});
- if (isOkay && respCode) {
+ if (isOkay && !clientError && !serverError) {
if (payload.isRaw()) {
C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
response_payload.setRawData(response_body_bytes);
diff --git a/extensions/http-curl/tests/C2UpdateAssetTest.cpp b/extensions/http-curl/tests/C2UpdateAssetTest.cpp
new file mode 100644
index 000000000..f1e0136b6
--- /dev/null
+++ b/extensions/http-curl/tests/C2UpdateAssetTest.cpp
@@ -0,0 +1,260 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <vector>
+#include <string>
+#include <fstream>
+#include <iterator>
+
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+#include "utils/Environment.h"
+
+class FileProvider : public ServerAwareHandler {
+ public:
+ explicit FileProvider(std::string file_content): file_content_(std::move(file_content)) {}
+
+ bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override {
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ file_content_.length());
+ mg_printf(conn, "%s", file_content_.c_str());
+ return true;
+ }
+
+ private:
+ std::string file_content_;
+};
+
+class C2HeartbeatHandler : public HeartbeatHandler {
+ public:
+ using HeartbeatHandler::HeartbeatHandler;
+
+ bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
+ std::lock_guard<std::mutex> guard(op_mtx_);
+ sendHeartbeatResponse(operations_, conn);
+ operations_.clear();
+ return true;
+ }
+
+ void addOperation(std::string id, std::unordered_map<std::string, std::string> args) {
+ std::lock_guard<std::mutex> guard(op_mtx_);
+ operations_.push_back(C2Operation{
+ .operation = "update",
+ .operand = "asset",
+ .operation_id = std::move(id),
+ .args = std::move(args)
+ });
+ }
+
+ private:
+ std::mutex op_mtx_;
+ std::vector<C2Operation> operations_;
+};
+
+class VerifyC2AssetUpdate : public VerifyC2Base {
+ public:
+ void configureC2() override {
+ configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+ configuration->set("nifi.c2.enable", "true");
+ configuration->set("nifi.c2.agent.heartbeat.period", "100");
+ }
+
+ void runAssertions() override {
+ assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
+ }
+
+ void setVerifier(std::function<bool()> verify) {
+ verify_ = std::move(verify);
+ }
+
+ private:
+ std::function<bool()> verify_;
+};
+
+struct AssetUpdateOperation {
+ std::string id;
+ std::unordered_map<std::string, std::string> args;
+ std::string state;
+ std::optional<std::string> details;
+};
+
+int main() {
+ TestController controller;
+
+ // setup minifi home
+ const std::filesystem::path home_dir = controller.createTempDirectory();
+ const auto asset_dir = home_dir / "asset";
+ utils::Environment::setCurrentWorkingDirectory(home_dir.string().c_str());
+
+ C2AcknowledgeHandler ack_handler;
+ std::string file_A = "hello from file A";
+ FileProvider file_A_provider{file_A};
+ std::string file_B = "hello from file B";
+ FileProvider file_B_provider{file_B};
+ C2HeartbeatHandler hb_handler{std::make_shared<minifi::Configure>()};
+
+ VerifyC2AssetUpdate harness;
+ harness.setUrl("http://localhost:0/api/file/A.txt", &file_A_provider);
+ harness.setUrl("http://localhost:0/api/file/B.txt", &file_B_provider);
+
+ std::string absolute_file_A_url = "http://localhost:" + harness.getWebPort() + "/api/file/A.txt";
+
+ std::vector<AssetUpdateOperation> operations;
+
+ operations.push_back({
+ .id = "1",
+ .args = {},
+ .state = "NOT_APPLIED",
+ .details = "Couldn't find 'file' argument"
+ });
+
+ operations.push_back({
+ .id = "2",
+ .args = {
+ {"file", "my_file.txt"}
+ },
+ .state = "NOT_APPLIED",
+ .details = "Couldn't find 'url' argument"
+ });
+
+ operations.push_back({
+ .id = "3",
+ .args = {
+ {"file", "my_file.txt"},
+ {"url", "/api/file/A.txt"}
+ },
+ .state = "FULLY_APPLIED",
+ .details = std::nullopt
+ });
+
+ operations.push_back({
+ .id = "4",
+ .args = {
+ {"file", "my_file.txt"},
+ {"url", "/api/file/A.txt"}
+ },
+ .state = "NO_OPERATION",
+ .details = std::nullopt
+ });
+
+ operations.push_back({
+ .id = "5",
+ .args = {
+ {"file", "my_file.txt"},
+ {"url", "/api/file/B.txt"},
+ {"forceDownload", "true"}
+ },
+ .state = "FULLY_APPLIED",
+ .details = std::nullopt
+ });
+
+ operations.push_back({
+ .id = "6",
+ .args = {
+ {"file", "new_dir/inner/my_file.txt"},
+ {"url", "/api/file/A.txt"}
+ },
+ .state = "FULLY_APPLIED",
+ .details = std::nullopt
+ });
+
+ operations.push_back({
+ .id = "7",
+ .args = {
+ {"file", "dummy.txt"},
+ {"url", "/not_existing_api/file.txt"}
+ },
+ .state = "NOT_APPLIED",
+ .details = "Failed to fetch asset"
+ });
+
+ operations.push_back({
+ .id = "8",
+ .args = {
+ {"file", "../../system_lib.dll"},
+ {"url", "/not_existing_api/file.txt"}
+ },
+ .state = "NOT_APPLIED",
+ .details = "Accessing parent directory is forbidden in file path"
+ });
+
+ operations.push_back({
+ .id = "9",
+ .args = {
+ {"file", "other_dir/A.txt"},
+ {"url", absolute_file_A_url}
+ },
+ .state = "FULLY_APPLIED",
+ .details = std::nullopt
+ });
+
+ for (auto& op : operations) {
+ hb_handler.addOperation(op.id, op.args);
+ }
+
+ harness.setVerifier([&] () -> bool {
+ for (auto& op : operations) {
+ if (auto res = ack_handler.getState(op.id)) {
+ if (res->state != op.state) {
+ controller.getLogger()->log_error("Operation '%s' in expected to return '%s', got '%s'", op.id, op.state, res->state);
+ assert(false);
+ }
+ if (op.details) {
+ if (res->details.find(op.details.value()) == std::string::npos) {
+ controller.getLogger()->log_error("In operation '%s' failed to find '%s' in ack details '%s'", op.id, op.details.value(), res->details);
+ assert(false);
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
+ });
+
+ harness.setUrl("http://localhost:0/api/heartbeat", &hb_handler);
+ harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler);
+ harness.setC2Url("/api/heartbeat", "/api/acknowledge");
+
+ harness.run();
+
+ std::unordered_map<std::string, std::string> expected_files;
+ // verify directory structure
+ for (auto& op : operations) {
+ if (op.state != "FULLY_APPLIED") {
+ // this op failed no file made on the disk
+ continue;
+ }
+ expected_files[(asset_dir / op.args["file"]).string()] = utils::StringUtils::endsWith(op.args["url"], "A.txt") ? file_A : file_B;
+ }
+
+ size_t file_count = utils::file::list_dir_all(asset_dir.string(), controller.getLogger()).size();
+ if (file_count != expected_files.size()) {
+ controller.getLogger()->log_error("Expected %zu files, got %zu", expected_files.size(), file_count);
+ assert(false);
+ }
+ for (auto& [path, content] : expected_files) {
+ if (utils::file::get_content(path) != content) {
+ controller.getLogger()->log_error("File content mismatch at '%s'", path);
+ assert(false);
+ }
+ }
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 975b6e907..d8849c8db 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -102,3 +102,4 @@ add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateAssetTest COMMAND C2UpdateAssetTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 0174eb197..eeeea2dde 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -833,23 +833,36 @@ class RetryHttpGetResponder : public ServerAwareHandler {
};
class C2AcknowledgeHandler : public ServerAwareHandler {
+ struct OpResult {
+ std::string state;
+ std::string details;
+ };
+
public:
bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
std::string req = readPayload(conn);
rapidjson::Document root;
root.Parse(req.data(), req.size());
- if (root.IsObject() && root.HasMember("operationId")) {
- std::lock_guard<std::mutex> guard(ack_operations_mtx_);
- acknowledged_operations_.insert(root["operationId"].GetString());
- }
+
+ std::string result_state;
+ std::string details;
if (root.IsObject() && root.HasMember("operationState")) {
- if (root["operationState"].IsObject() && root["operationState"].HasMember("state")) {
- std::lock_guard<std::mutex> guard(apply_count_mtx_);
- auto result_state = root["operationState"]["state"].GetString();
- ++apply_count_[result_state];
+ if (root["operationState"].IsObject()) {
+ if (root["operationState"].HasMember("state")) {
+ result_state = root["operationState"]["state"].GetString();
+ std::lock_guard<std::mutex> guard(apply_count_mtx_);
+ ++apply_count_[result_state];
+ }
+ if (root["operationState"].HasMember("details")) {
+ details = root["operationState"]["details"].GetString();
+ }
}
}
+ if (root.IsObject() && root.HasMember("operationId")) {
+ std::lock_guard<std::mutex> guard(ack_operations_mtx_);
+ acknowledged_operations_.insert({root["operationId"].GetString(), OpResult{result_state, details}});
+ }
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -861,6 +874,14 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
return acknowledged_operations_.count(operation_id) > 0;
}
+ std::optional<OpResult> getState(const std::string& operation_id) const {
+ std::lock_guard<std::mutex> guard(ack_operations_mtx_);
+ if (auto it = acknowledged_operations_.find(operation_id); it != acknowledged_operations_.end()) {
+ return it->second;
+ }
+ return std::nullopt;
+ }
+
uint32_t getApplyCount(const std::string& result_state) const {
std::lock_guard<std::mutex> guard(apply_count_mtx_);
return apply_count_.find(result_state) != apply_count_.end() ? apply_count_.at(result_state) : 0;
@@ -869,6 +890,6 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
private:
mutable std::mutex ack_operations_mtx_;
mutable std::mutex apply_count_mtx_;
- std::set<std::string> acknowledged_operations_;
+ std::unordered_map<std::string, OpResult> acknowledged_operations_;
std::unordered_map<std::string, uint32_t> apply_count_;
};
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index b32b029f6..5fb564786 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -173,6 +173,7 @@ class C2Agent : public state::UpdateController {
bool handleConfigurationUpdate(const C2ContentResponse &resp);
void handlePropertyUpdate(const C2ContentResponse &resp);
+ void handleAssetUpdate(const C2ContentResponse &resp);
std::optional<std::string> resolveFlowUrl(const std::string& url) const;
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index badc5e0eb..de8790a3a 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -60,7 +60,8 @@ SMART_ENUM(DescribeOperand,
SMART_ENUM(UpdateOperand,
(CONFIGURATION, "configuration"),
- (PROPERTIES, "properties")
+ (PROPERTIES, "properties"),
+ (ASSET, "asset")
)
SMART_ENUM(TransferOperand,
@@ -118,6 +119,13 @@ struct C2ContentResponse {
friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response);
+ std::optional<std::string> getArgument(const std::string& arg_name) const {
+ if (auto it = operation_arguments.find(arg_name); it != operation_arguments.end()) {
+ return it->second.to_string();
+ }
+ return std::nullopt;
+ }
+
Operation op;
// determines if the operation is required
bool required{ false };
@@ -199,6 +207,7 @@ class C2Payload : public state::Update {
*/
[[nodiscard]] std::vector<std::byte> getRawData() const noexcept { return raw_data_; }
[[nodiscard]] std::string getRawDataAsString() const { return utils::span_to<std::string>(gsl::make_span(getRawData()).as_span<const char>()); }
+ [[nodiscard]] std::vector<std::byte> moveRawData() && {return std::move(raw_data_);}
/**
* Add a nested payload.
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 301bdafd2..aa0c05f3f 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -153,6 +153,8 @@ class Configuration : public Properties {
static constexpr const char *nifi_log_compression_cached_log_max_size = "nifi.log.compression.cached.log.max.size";
static constexpr const char *nifi_log_compression_compressed_log_max_size = "nifi.log.compression.compressed.log.max.size";
+ static constexpr const char *nifi_asset_directory = "nifi.asset.directory";
+
MINIFIAPI static const std::vector<core::ConfigurationProperty> CONFIGURATION_PROPERTIES;
MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES;
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index c8bc7e445..07970d51f 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -122,7 +122,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
core::ConfigurationProperty{Configuration::nifi_log_appender_syslog},
core::ConfigurationProperty{Configuration::nifi_log_logger_root},
core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
- core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}
+ core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_asset_directory}
};
const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase,
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 3e6fc92a3..82ceded8d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -601,6 +601,10 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
handlePropertyUpdate(resp);
break;
}
+ case UpdateOperand::ASSET: {
+ handleAssetUpdate(resp);
+ break;
+ }
}
}
@@ -900,6 +904,113 @@ bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
return true;
}
+static auto make_path(const std::string& str) {
+ return std::filesystem::path(str);
+}
+
+static std::optional<std::string> validateFilePath(const std::filesystem::path& path) {
+ if (path.empty()) {
+ return "Empty file path";
+ }
+ if (!path.is_relative()) {
+ return "File path must be a relative path '" + path.string() + "'";
+ }
+ if (!path.has_filename()) {
+ return "Filename missing in output path '" + path.string() + "'";
+ }
+ if (path.filename() == "." || path.filename() == "..") {
+ return "Invalid filename '" + path.filename().string() + "'";
+ }
+ for (const auto& segment : path) {
+ if (segment == "..") {
+ return "Accessing parent directory is forbidden in file path '" + path.string() + "'";
+ }
+ }
+ return std::nullopt;
+}
+
+void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) {
+ auto send_error = [&] (std::string_view error) {
+ logger_->log_error("%s", std::string(error));
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true);
+ response.setRawData(gsl::span<const char>(error).as_span<const std::byte>());
+ enqueue_c2_response(std::move(response));
+ };
+ std::filesystem::path asset_dir = std::filesystem::path(configuration_->getHome()) / "asset";
+ if (auto asset_dir_str = configuration_->get(Configuration::nifi_asset_directory)) {
+ asset_dir = asset_dir_str.value();
+ }
+
+ // output file
+ std::filesystem::path file_path;
+ if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) {
+ if (auto error = validateFilePath(file_rel.value())) {
+ send_error(error.value());
+ return;
+ }
+ file_path = asset_dir / file_rel.value();
+ } else {
+ send_error("Couldn't find 'file' argument");
+ return;
+ }
+
+ // source url
+ std::string url;
+ if (auto url_str = resp.getArgument("url")) {
+ if (auto resolved_url = resolveUrl(*url_str)) {
+ url = resolved_url.value();
+ } else {
+ send_error("Couldn't resolve url");
+ return;
+ }
+ } else {
+ send_error("Couldn't find 'url' argument");
+ return;
+ }
+
+ // forceDownload
+ bool force_download = false;
+ if (auto force_download_str = resp.getArgument("forceDownload")) {
+ if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "true")) {
+ force_download = true;
+ } else if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "false")) {
+ force_download = false;
+ } else {
+ send_error("Argument 'forceDownload' must be either 'true' or 'false'");
+ return;
+ }
+ }
+
+ if (!force_download && std::filesystem::exists(file_path)) {
+ logger_->log_info("File already exists");
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NO_OPERATION, resp.ident, true);
+ enqueue_c2_response(std::move(response));
+ return;
+ }
+
+ C2Payload file_response = protocol_.load()->consumePayload(url, C2Payload(Operation::TRANSFER, true), RECEIVE, false);
+
+ if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) {
+ send_error("Failed to fetch asset from '" + url + "'");
+ return;
+ }
+
+ auto raw_data = std::move(file_response).moveRawData();
+ // ensure directory exists for file
+ if (utils::file::create_dir(file_path.parent_path().string()) != 0) {
+ send_error("Failed to create directory '" + file_path.parent_path().string() + "'");
+ return;
+ }
+
+ {
+ std::ofstream file{file_path, std::ofstream::binary};
+ file.write(reinterpret_cast<const char*>(raw_data.data()), raw_data.size());
+ }
+
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, true);
+ enqueue_c2_response(std::move(response));
+}
+
void C2Agent::enqueue_c2_server_response(C2Payload &&resp) {
logger_->log_trace("Server response: %s", [&] {return resp.str();});