You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/23 07:54:54 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1831 Asset download through c2

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e31a9958a MINIFICPP-1831 Asset download through c2
e31a9958a is described below

commit e31a9958a64fdbc016f059793095bed9fc3f7cda
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri May 20 17:46:11 2022 +0200

    MINIFICPP-1831 Asset download through c2
    
    Closes #1332
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 extensions/http-curl/protocols/RESTSender.cpp    |   2 +-
 extensions/http-curl/tests/C2UpdateAssetTest.cpp | 260 +++++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt        |   1 +
 extensions/http-curl/tests/HTTPHandlers.h        |  39 +++-
 libminifi/include/c2/C2Agent.h                   |   1 +
 libminifi/include/c2/C2Payload.h                 |  11 +-
 libminifi/include/properties/Configuration.h     |   2 +
 libminifi/src/Configuration.cpp                  |   3 +-
 libminifi/src/c2/C2Agent.cpp                     | 111 ++++++++++
 9 files changed, 418 insertions(+), 12 deletions(-)

diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index c94e1c0a4..7f593684d 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -160,7 +160,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
   }
   const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>();
   logger_->log_trace("Received response: \"%s\"", [&] {return utils::StringUtils::escapeUnprintableBytes(response_body_bytes);});
-  if (isOkay && respCode) {
+  if (isOkay && !clientError && !serverError) {
     if (payload.isRaw()) {
       C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
       response_payload.setRawData(response_body_bytes);
diff --git a/extensions/http-curl/tests/C2UpdateAssetTest.cpp b/extensions/http-curl/tests/C2UpdateAssetTest.cpp
new file mode 100644
index 000000000..f1e0136b6
--- /dev/null
+++ b/extensions/http-curl/tests/C2UpdateAssetTest.cpp
@@ -0,0 +1,260 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <vector>
+#include <string>
+#include <fstream>
+#include <iterator>
+
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+#include "utils/Environment.h"
+
+class FileProvider : public ServerAwareHandler {
+ public:
+  explicit FileProvider(std::string file_content): file_content_(std::move(file_content)) {}
+
+  bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override {
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              file_content_.length());
+    mg_printf(conn, "%s", file_content_.c_str());
+    return true;
+  }
+
+ private:
+  std::string file_content_;
+};
+
+class C2HeartbeatHandler : public HeartbeatHandler {
+ public:
+  using HeartbeatHandler::HeartbeatHandler;
+
+  bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
+    std::lock_guard<std::mutex> guard(op_mtx_);
+    sendHeartbeatResponse(operations_, conn);
+    operations_.clear();
+    return true;
+  }
+
+  void addOperation(std::string id, std::unordered_map<std::string, std::string> args) {
+    std::lock_guard<std::mutex> guard(op_mtx_);
+    operations_.push_back(C2Operation{
+      .operation = "update",
+      .operand = "asset",
+      .operation_id = std::move(id),
+      .args = std::move(args)
+    });
+  }
+
+ private:
+  std::mutex op_mtx_;
+  std::vector<C2Operation> operations_;
+};
+
+class VerifyC2AssetUpdate : public VerifyC2Base {
+ public:
+  void configureC2() override {
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.heartbeat.period", "100");
+  }
+
+  void runAssertions() override {
+    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
+  }
+
+  void setVerifier(std::function<bool()> verify) {
+    verify_ = std::move(verify);
+  }
+
+ private:
+  std::function<bool()> verify_;
+};
+
+struct AssetUpdateOperation {
+  std::string id;
+  std::unordered_map<std::string, std::string> args;
+  std::string state;
+  std::optional<std::string> details;
+};
+
+int main() {
+  TestController controller;
+
+  // setup minifi home
+  const std::filesystem::path home_dir = controller.createTempDirectory();
+  const auto asset_dir = home_dir / "asset";
+  utils::Environment::setCurrentWorkingDirectory(home_dir.string().c_str());
+
+  C2AcknowledgeHandler ack_handler;
+  std::string file_A = "hello from file A";
+  FileProvider file_A_provider{file_A};
+  std::string file_B = "hello from file B";
+  FileProvider file_B_provider{file_B};
+  C2HeartbeatHandler hb_handler{std::make_shared<minifi::Configure>()};
+
+  VerifyC2AssetUpdate harness;
+  harness.setUrl("http://localhost:0/api/file/A.txt", &file_A_provider);
+  harness.setUrl("http://localhost:0/api/file/B.txt", &file_B_provider);
+
+  std::string absolute_file_A_url = "http://localhost:" + harness.getWebPort() + "/api/file/A.txt";
+
+  std::vector<AssetUpdateOperation> operations;
+
+  operations.push_back({
+    .id = "1",
+    .args = {},
+    .state = "NOT_APPLIED",
+    .details = "Couldn't find 'file' argument"
+  });
+
+  operations.push_back({
+    .id = "2",
+    .args = {
+        {"file", "my_file.txt"}
+    },
+    .state = "NOT_APPLIED",
+    .details = "Couldn't find 'url' argument"
+  });
+
+  operations.push_back({
+    .id = "3",
+    .args = {
+        {"file", "my_file.txt"},
+        {"url", "/api/file/A.txt"}
+    },
+    .state = "FULLY_APPLIED",
+    .details = std::nullopt
+  });
+
+  operations.push_back({
+    .id = "4",
+    .args = {
+        {"file", "my_file.txt"},
+        {"url", "/api/file/A.txt"}
+    },
+    .state = "NO_OPERATION",
+    .details = std::nullopt
+  });
+
+  operations.push_back({
+    .id = "5",
+    .args = {
+        {"file", "my_file.txt"},
+        {"url", "/api/file/B.txt"},
+        {"forceDownload", "true"}
+    },
+    .state = "FULLY_APPLIED",
+    .details = std::nullopt
+  });
+
+  operations.push_back({
+    .id = "6",
+    .args = {
+        {"file", "new_dir/inner/my_file.txt"},
+        {"url", "/api/file/A.txt"}
+    },
+    .state = "FULLY_APPLIED",
+    .details = std::nullopt
+  });
+
+  operations.push_back({
+    .id = "7",
+    .args = {
+        {"file", "dummy.txt"},
+        {"url", "/not_existing_api/file.txt"}
+    },
+    .state = "NOT_APPLIED",
+    .details = "Failed to fetch asset"
+  });
+
+  operations.push_back({
+    .id = "8",
+    .args = {
+        {"file", "../../system_lib.dll"},
+        {"url", "/not_existing_api/file.txt"}
+    },
+    .state = "NOT_APPLIED",
+    .details = "Accessing parent directory is forbidden in file path"
+  });
+
+  operations.push_back({
+    .id = "9",
+    .args = {
+        {"file", "other_dir/A.txt"},
+        {"url", absolute_file_A_url}
+    },
+    .state = "FULLY_APPLIED",
+    .details = std::nullopt
+  });
+
+  for (auto& op : operations) {
+    hb_handler.addOperation(op.id, op.args);
+  }
+
+  harness.setVerifier([&] () -> bool {
+    for (auto& op : operations) {
+      if (auto res = ack_handler.getState(op.id)) {
+        if (res->state != op.state) {
+          controller.getLogger()->log_error("Operation '%s' in expected to return '%s', got '%s'", op.id, op.state, res->state);
+          assert(false);
+        }
+        if (op.details) {
+          if (res->details.find(op.details.value()) == std::string::npos) {
+            controller.getLogger()->log_error("In operation '%s' failed to find '%s' in ack details '%s'", op.id, op.details.value(), res->details);
+            assert(false);
+          }
+        }
+      } else {
+        return false;
+      }
+    }
+    return true;
+  });
+
+  harness.setUrl("http://localhost:0/api/heartbeat", &hb_handler);
+  harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler);
+  harness.setC2Url("/api/heartbeat", "/api/acknowledge");
+
+  harness.run();
+
+  std::unordered_map<std::string, std::string> expected_files;
+  // verify directory structure
+  for (auto& op : operations) {
+    if (op.state != "FULLY_APPLIED") {
+      // this op failed no file made on the disk
+      continue;
+    }
+    expected_files[(asset_dir / op.args["file"]).string()] = utils::StringUtils::endsWith(op.args["url"], "A.txt") ? file_A : file_B;
+  }
+
+  size_t file_count = utils::file::list_dir_all(asset_dir.string(), controller.getLogger()).size();
+  if (file_count != expected_files.size()) {
+    controller.getLogger()->log_error("Expected %zu files, got %zu", expected_files.size(), file_count);
+    assert(false);
+  }
+  for (auto& [path, content] : expected_files) {
+    if (utils::file::get_content(path) != content) {
+      controller.getLogger()->log_error("File content mismatch at '%s'", path);
+      assert(false);
+    }
+  }
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 975b6e907..d8849c8db 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -102,3 +102,4 @@ add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
 add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
 add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2UpdateAssetTest COMMAND C2UpdateAssetTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 0174eb197..eeeea2dde 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -833,23 +833,36 @@ class RetryHttpGetResponder : public ServerAwareHandler {
 };
 
 class C2AcknowledgeHandler : public ServerAwareHandler {
+  struct OpResult {
+    std::string state;
+    std::string details;
+  };
+
  public:
   bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
     std::string req = readPayload(conn);
     rapidjson::Document root;
     root.Parse(req.data(), req.size());
-    if (root.IsObject() && root.HasMember("operationId")) {
-      std::lock_guard<std::mutex> guard(ack_operations_mtx_);
-      acknowledged_operations_.insert(root["operationId"].GetString());
-    }
+
+    std::string result_state;
+    std::string details;
 
     if (root.IsObject() && root.HasMember("operationState")) {
-      if (root["operationState"].IsObject() && root["operationState"].HasMember("state")) {
-        std::lock_guard<std::mutex> guard(apply_count_mtx_);
-        auto result_state = root["operationState"]["state"].GetString();
-        ++apply_count_[result_state];
+      if (root["operationState"].IsObject()) {
+        if (root["operationState"].HasMember("state")) {
+          result_state = root["operationState"]["state"].GetString();
+          std::lock_guard<std::mutex> guard(apply_count_mtx_);
+          ++apply_count_[result_state];
+        }
+        if (root["operationState"].HasMember("details")) {
+          details = root["operationState"]["details"].GetString();
+        }
       }
     }
+    if (root.IsObject() && root.HasMember("operationId")) {
+      std::lock_guard<std::mutex> guard(ack_operations_mtx_);
+      acknowledged_operations_.insert({root["operationId"].GetString(), OpResult{result_state, details}});
+    }
 
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
                     "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
@@ -861,6 +874,14 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
     return acknowledged_operations_.count(operation_id) > 0;
   }
 
+  std::optional<OpResult> getState(const std::string& operation_id) const {
+    std::lock_guard<std::mutex> guard(ack_operations_mtx_);
+    if (auto it = acknowledged_operations_.find(operation_id); it != acknowledged_operations_.end()) {
+      return it->second;
+    }
+    return std::nullopt;
+  }
+
   uint32_t getApplyCount(const std::string& result_state) const {
     std::lock_guard<std::mutex> guard(apply_count_mtx_);
     return apply_count_.find(result_state) != apply_count_.end() ? apply_count_.at(result_state) : 0;
@@ -869,6 +890,6 @@ class C2AcknowledgeHandler : public ServerAwareHandler {
  private:
   mutable std::mutex ack_operations_mtx_;
   mutable std::mutex apply_count_mtx_;
-  std::set<std::string> acknowledged_operations_;
+  std::unordered_map<std::string, OpResult> acknowledged_operations_;
   std::unordered_map<std::string, uint32_t> apply_count_;
 };
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index b32b029f6..5fb564786 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -173,6 +173,7 @@ class C2Agent : public state::UpdateController {
 
   bool handleConfigurationUpdate(const C2ContentResponse &resp);
   void handlePropertyUpdate(const C2ContentResponse &resp);
+  void handleAssetUpdate(const C2ContentResponse &resp);
 
   std::optional<std::string> resolveFlowUrl(const std::string& url) const;
 
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index badc5e0eb..de8790a3a 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -60,7 +60,8 @@ SMART_ENUM(DescribeOperand,
 
 SMART_ENUM(UpdateOperand,
   (CONFIGURATION, "configuration"),
-  (PROPERTIES, "properties")
+  (PROPERTIES, "properties"),
+  (ASSET, "asset")
 )
 
 SMART_ENUM(TransferOperand,
@@ -118,6 +119,13 @@ struct C2ContentResponse {
 
   friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response);
 
+  std::optional<std::string> getArgument(const std::string& arg_name) const {
+    if (auto it = operation_arguments.find(arg_name); it != operation_arguments.end()) {
+      return it->second.to_string();
+    }
+    return std::nullopt;
+  }
+
   Operation op;
   // determines if the operation is required
   bool required{ false };
@@ -199,6 +207,7 @@ class C2Payload : public state::Update {
    */
   [[nodiscard]] std::vector<std::byte> getRawData() const noexcept { return raw_data_; }
   [[nodiscard]] std::string getRawDataAsString() const { return utils::span_to<std::string>(gsl::make_span(getRawData()).as_span<const char>()); }
+  [[nodiscard]] std::vector<std::byte> moveRawData() && {return std::move(raw_data_);}
 
   /**
    * Add a nested payload.
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 301bdafd2..aa0c05f3f 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -153,6 +153,8 @@ class Configuration : public Properties {
   static constexpr const char *nifi_log_compression_cached_log_max_size = "nifi.log.compression.cached.log.max.size";
   static constexpr const char *nifi_log_compression_compressed_log_max_size = "nifi.log.compression.compressed.log.max.size";
 
+  static constexpr const char *nifi_asset_directory = "nifi.asset.directory";
+
   MINIFIAPI static const std::vector<core::ConfigurationProperty> CONFIGURATION_PROPERTIES;
   MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES;
 
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index c8bc7e445..07970d51f 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -122,7 +122,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_log_appender_syslog},
   core::ConfigurationProperty{Configuration::nifi_log_logger_root},
   core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
-  core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}
+  core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_asset_directory}
 };
 
 const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase,
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 3e6fc92a3..82ceded8d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -601,6 +601,10 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
       handlePropertyUpdate(resp);
       break;
     }
+    case UpdateOperand::ASSET: {
+      handleAssetUpdate(resp);
+      break;
+    }
   }
 }
 
@@ -900,6 +904,113 @@ bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) {
   return true;
 }
 
+static auto make_path(const std::string& str) {
+  return std::filesystem::path(str);
+}
+
+static std::optional<std::string> validateFilePath(const std::filesystem::path& path) {
+  if (path.empty()) {
+    return "Empty file path";
+  }
+  if (!path.is_relative()) {
+    return "File path must be a relative path '" + path.string() + "'";
+  }
+  if (!path.has_filename()) {
+    return "Filename missing in output path '" + path.string() + "'";
+  }
+  if (path.filename() == "." || path.filename() == "..") {
+    return "Invalid filename '" + path.filename().string() + "'";
+  }
+  for (const auto& segment : path) {
+    if (segment == "..") {
+      return "Accessing parent directory is forbidden in file path '" + path.string() + "'";
+    }
+  }
+  return std::nullopt;
+}
+
+void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) {
+  auto send_error = [&] (std::string_view error) {
+    logger_->log_error("%s", std::string(error));
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true);
+    response.setRawData(gsl::span<const char>(error).as_span<const std::byte>());
+    enqueue_c2_response(std::move(response));
+  };
+  std::filesystem::path asset_dir = std::filesystem::path(configuration_->getHome()) / "asset";
+  if (auto asset_dir_str = configuration_->get(Configuration::nifi_asset_directory)) {
+    asset_dir = asset_dir_str.value();
+  }
+
+  // output file
+  std::filesystem::path file_path;
+  if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) {
+    if (auto error = validateFilePath(file_rel.value())) {
+      send_error(error.value());
+      return;
+    }
+    file_path = asset_dir / file_rel.value();
+  } else {
+    send_error("Couldn't find 'file' argument");
+    return;
+  }
+
+  // source url
+  std::string url;
+  if (auto url_str = resp.getArgument("url")) {
+    if (auto resolved_url = resolveUrl(*url_str)) {
+      url = resolved_url.value();
+    } else {
+      send_error("Couldn't resolve url");
+      return;
+    }
+  } else {
+    send_error("Couldn't find 'url' argument");
+    return;
+  }
+
+  // forceDownload
+  bool force_download = false;
+  if (auto force_download_str = resp.getArgument("forceDownload")) {
+    if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "true")) {
+      force_download = true;
+    } else if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "false")) {
+      force_download = false;
+    } else {
+      send_error("Argument 'forceDownload' must be either 'true' or 'false'");
+      return;
+    }
+  }
+
+  if (!force_download && std::filesystem::exists(file_path)) {
+    logger_->log_info("File already exists");
+    C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NO_OPERATION, resp.ident, true);
+    enqueue_c2_response(std::move(response));
+    return;
+  }
+
+  C2Payload file_response = protocol_.load()->consumePayload(url, C2Payload(Operation::TRANSFER, true), RECEIVE, false);
+
+  if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) {
+    send_error("Failed to fetch asset from '" + url + "'");
+    return;
+  }
+
+  auto raw_data = std::move(file_response).moveRawData();
+  // ensure directory exists for file
+  if (utils::file::create_dir(file_path.parent_path().string()) != 0) {
+    send_error("Failed to create directory '" + file_path.parent_path().string() + "'");
+    return;
+  }
+
+  {
+    std::ofstream file{file_path, std::ofstream::binary};
+    file.write(reinterpret_cast<const char*>(raw_data.data()), raw_data.size());
+  }
+
+  C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, true);
+  enqueue_c2_response(std::move(response));
+}
+
 void C2Agent::enqueue_c2_server_response(C2Payload &&resp) {
   logger_->log_trace("Server response: %s", [&] {return resp.str();});