You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/01/19 17:32:13 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1684 - Provide agent logs through the c2 protocol
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 9c0a73f MINIFICPP-1684 - Provide agent logs through the c2 protocol
9c0a73f is described below
commit 9c0a73fc6bf2403b262b6e548f3dfb30e7613a1d
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Thu Nov 18 11:42:35 2021 +0100
MINIFICPP-1684 - Provide agent logs through the c2 protocol
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1233
---
extensions/http-curl/client/HTTPClient.cpp | 21 +++
extensions/http-curl/client/HTTPClient.h | 9 +-
extensions/http-curl/protocols/RESTSender.cpp | 49 +++--
extensions/http-curl/protocols/RESTSender.h | 2 +-
extensions/http-curl/tests/C2DebugBundleTest.cpp | 206 +++++++++++++++++++++
extensions/http-curl/tests/C2RequestClassTest.cpp | 25 ---
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/HTTPHandlers.h | 26 +++
extensions/http-curl/tests/HTTPIntegrationBase.h | 1 +
extensions/libarchive/WriteArchiveStream.cpp | 18 ++
extensions/libarchive/WriteArchiveStream.h | 7 +
libminifi/include/FlowController.h | 2 +
libminifi/include/c2/C2Agent.h | 8 +
libminifi/include/c2/C2Payload.h | 10 +
.../core/logging/internal/LogCompressorSink.h | 8 +-
libminifi/include/core/state/UpdateController.h | 2 +
libminifi/include/io/ArchiveStream.h | 1 +
libminifi/include/io/BufferStream.h | 4 +
libminifi/include/properties/Properties.h | 2 +
libminifi/src/FlowController.cpp | 14 ++
libminifi/src/c2/C2Agent.cpp | 144 +++++++++++---
.../core/logging/internal/LogCompressorSink.cpp | 12 +-
libminifi/src/properties/Properties.cpp | 5 +
libminifi/src/utils/ByteArrayCallback.cpp | 3 +
libminifi/test/integration/IntegrationBase.h | 2 +-
libminifi/test/unit/ControllerTests.cpp | 4 +
libminifi/test/unit/LoggerTests.cpp | 2 +-
27 files changed, 521 insertions(+), 67 deletions(-)
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index ae2da1a..a63c6bb 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -53,6 +53,20 @@ HTTPClient::HTTPClient()
http_session_ = curl_easy_init();
}
+void HTTPClient::addFormPart(const std::string& content_type, const std::string& name, HTTPUploadCallback* read_callback, const std::optional<std::string>& filename) {
+ if (!form_) {
+ form_ = curl_mime_init(http_session_);
+ }
+ curl_mimepart* part = curl_mime_addpart(form_);
+ curl_mime_type(part, content_type.c_str());
+ if (filename) {
+ curl_mime_filename(part, filename->c_str());
+ }
+ curl_mime_name(part, name.c_str());
+ curl_mime_data_cb(part, read_callback->ptr->getBufferSize(),
+ &utils::HTTPRequestResponse::send_write, nullptr, nullptr, static_cast<void*>(read_callback));
+}
+
HTTPClient::~HTTPClient() {
if (nullptr != headers_) {
curl_slist_free_all(headers_);
@@ -62,6 +76,10 @@ HTTPClient::~HTTPClient() {
curl_easy_cleanup(http_session_);
http_session_ = nullptr;
}
+ if (form_ != nullptr) {
+ curl_mime_free(form_);
+ form_ = nullptr;
+ }
// forceClose ended up not being the issue in MINIFICPP-667, but leaving here
// out of good hygiene.
forceClose();
@@ -290,6 +308,9 @@ bool HTTPClient::submit() {
logger_->log_debug("Not using keep alive");
curl_easy_setopt(http_session_, CURLOPT_TCP_KEEPALIVE, 0L);
}
+ if (form_ != nullptr) {
+ curl_easy_setopt(http_session_, CURLOPT_MIMEPOST, form_);
+ }
res = curl_easy_perform(http_session_);
if (callback == nullptr) {
read_callback_.close();
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index d7d3a40..05a24e1 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -74,6 +74,10 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
HTTPClient(const std::string& name, const utils::Identifier& uuid);
+ // class uses raw pointers
+ HTTPClient(const HTTPClient&) = delete;
+ HTTPClient& operator=(const HTTPClient&) = delete;
+
explicit HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
~HTTPClient();
@@ -82,6 +86,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
void setVerbose(bool use_stderr = false) override;
+ void addFormPart(const std::string& content_type, const std::string& name, HTTPUploadCallback* read_callback, const std::optional<std::string>& filename = std::nullopt);
+
void forceClose();
void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) override;
@@ -285,7 +291,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
CURLcode res{CURLE_OK};
- CURL *http_session_;
+ CURL* http_session_{nullptr};
+ curl_mime* form_{nullptr};
std::string method_;
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 7b408e4..4217327 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <vector>
+#include <utility>
#include <limits>
#include "utils/file/FileUtils.h"
#include "core/Resource.h"
@@ -55,12 +56,13 @@ void RESTSender::initialize(core::controller::ControllerServiceProvider* control
}
C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool /*async*/) {
- std::string outputConfig;
+ std::optional<std::string> data;
- if (direction == Direction::TRANSMIT) {
- outputConfig = serializeJsonRootPayload(payload);
+ if (direction == Direction::TRANSMIT && payload.getOperation() != Operation::TRANSFER) {
+ // treat payload as json
+ data = serializeJsonRootPayload(payload);
}
- return sendPayload(url, direction, payload, outputConfig);
+ return sendPayload(url, direction, payload, std::move(data));
}
C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
@@ -83,14 +85,14 @@ void RESTSender::setSecurityContext(utils::HTTPClient &client, const std::string
client.initialize(type, url, generatedService);
}
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, std::optional<std::string> data) {
if (url.empty()) {
return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR);
}
// Callback for transmit. Declared in order to destruct in proper order - take care!
- std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+ std::vector<std::unique_ptr<utils::ByteInputCallBack>> inputs;
+ std::vector<std::unique_ptr<utils::HTTPUploadCallback>> callbacks;
// Callback for transfer. Declared in order to destruct in proper order - take care!
std::unique_ptr<utils::ByteOutputCallback> file_callback = nullptr;
@@ -102,17 +104,36 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d
client.setKeepAliveIdle(std::chrono::milliseconds(2000));
client.setConnectionTimeout(std::chrono::milliseconds(2000));
if (direction == Direction::TRANSMIT) {
- input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
- callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback());
- input->write(outputConfig);
- callback->ptr = input.get();
- callback->pos = 0;
client.set_request_method("POST");
if (!ssl_context_service_ && url.find("https://") == 0) {
setSecurityContext(client, "POST", url);
}
- client.setUploadCallback(callback.get());
- client.setPostSize(outputConfig.size());
+ if (payload.getOperation() == Operation::TRANSFER) {
+ // treat nested payloads as files
+ for (const auto& file : payload.getNestedPayloads()) {
+ std::string filename = file.getLabel();
+ if (filename.empty()) {
+ throw std::logic_error("Missing filename");
+ }
+ auto file_input = std::make_unique<utils::ByteInputCallBack>();
+ auto file_cb = std::make_unique<utils::HTTPUploadCallback>();
+ auto file_data = file.getRawData();
+ file_input->write(std::string{file_data.begin(), file_data.end()});
+ file_cb->ptr = file_input.get();
+ client.addFormPart("application/octet-stream", "file", file_cb.get(), filename);
+ inputs.push_back(std::move(file_input));
+ callbacks.push_back(std::move(file_cb));
+ }
+ } else {
+ auto data_input = std::make_unique<utils::ByteInputCallBack>();
+ auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+ data_input->write(data.value_or(""));
+ data_cb->ptr = data_input.get();
+ client.setUploadCallback(data_cb.get());
+ client.setPostSize(data ? data->size() : 0);
+ inputs.push_back(std::move(data_input));
+ callbacks.push_back(std::move(data_cb));
+ }
} else {
// we do not need to set the upload callback
// since we are not uploading anything on a get
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 2df138f..c7a8c86 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -52,7 +52,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
protected:
- virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
+ C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, std::optional<std::string> data);
/**
* Initializes the SSLContextService onto the HTTP client if one is needed
diff --git a/extensions/http-curl/tests/C2DebugBundleTest.cpp b/extensions/http-curl/tests/C2DebugBundleTest.cpp
new file mode 100644
index 0000000..9cf7be1
--- /dev/null
+++ b/extensions/http-curl/tests/C2DebugBundleTest.cpp
@@ -0,0 +1,206 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "TestBase.h"
+
+#include "c2/C2Agent.h"
+#include "protocols/RESTProtocol.h"
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "io/ArchiveStream.h"
+
+using std::literals::chrono_literals::operator""s;
+
+class VerifyDebugInfo : public VerifyC2Base {
+ public:
+ explicit VerifyDebugInfo(std::function<bool()> verify): verify_(std::move(verify)) {}
+
+ void testSetup() override {
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
+ VerifyC2Base::testSetup();
+ }
+
+ void runAssertions() override {
+ assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(300), verify_));
+ }
+
+ void configureC2() override {
+ VerifyC2Base::configureC2();
+ configuration->set("nifi.c2.agent.heartbeat.period", "100");
+ }
+
+ std::function<bool()> verify_;
+};
+
+class C2DebugBundleHandler : public ServerAwareHandler {
+ public:
+ bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
+ std::optional<std::string> file_content;
+ mg_form_data_handler form_handler;
+ form_handler.field_found = field_found;
+ form_handler.field_get = field_get;
+ form_handler.user_data = &file_content;
+ mg_handle_form_request(conn, &form_handler);
+ assert(file_content);
+ {
+ std::lock_guard<std::mutex> lock(mtx_);
+ bundles_.push_back(std::move(*file_content));
+ }
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ return true;
+ }
+
+ std::vector<std::string> getBundles() {
+ std::lock_guard<std::mutex> lock(mtx_);
+ return bundles_;
+ }
+
+ private:
+ static int field_found(const char* key, const char* filename, char* /*path*/, size_t /*pathlen*/, void* user_data) {
+ auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
+ if (!filename || std::string(filename) != "debug.tar.gz") {
+ throw std::runtime_error("Unknown form entry: " + std::string{key});
+ }
+ if (file_content) {
+ throw std::runtime_error("Debug archive has already been extracted: " + std::string{key});
+ }
+ return MG_FORM_FIELD_STORAGE_GET;
+ }
+ static int field_get(const char* /*key*/, const char* value, size_t valuelen, void* user_data) {
+ auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
+ file_content = "";
+ (*file_content) += std::string(value, valuelen);
+ return MG_FORM_FIELD_HANDLE_GET;
+ }
+
+ std::mutex mtx_;
+ std::vector<std::string> bundles_;
+};
+
+class C2HeartbeatHandler : public ServerAwareHandler {
+ public:
+ bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
+ if (response_) {
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ response_->length());
+ mg_printf(conn, "%s", response_->c_str());
+ response_.reset();
+ } else {
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ }
+
+ return true;
+ }
+
+ void setC2RestResponse(const std::string& url) {
+ response_ =
+ R"({
+ "operation" : "heartbeat",
+ "requested_operations": [{
+ "operation" : "transfer",
+ "operationid" : "79",
+ "name": "debug",
+ "args": {"target": ")" + url + R"("}
+ }]
+ })";
+ }
+
+ private:
+ std::optional<std::string> response_;
+};
+
+static std::string properties_file = "some.dummy.content = here\n";
+static std::string flow_config_file = R"(
+ Flow Controller:
+ name: Banana Bread
+ Processors: []
+ Connections: []
+ Remote Processing Groups: []
+ Provenance Reporting:
+)";
+
+int main() {
+ TestController controller;
+
+ C2HeartbeatHandler heartbeat_handler;
+ C2AcknowledgeHandler ack_handler;
+ C2DebugBundleHandler bundle_handler;
+
+ std::filesystem::path home_dir = controller.createTempDirectory();
+ utils::file::PathUtils::create_dir((home_dir / "conf").string());
+ std::ofstream{home_dir / "conf/minifi.properties", std::ios::binary} << properties_file;
+ std::ofstream{home_dir / "conf/config.yml", std::ios::binary} << flow_config_file;
+
+ VerifyDebugInfo harness([&]() -> bool {
+ if (!ack_handler.isAcknowledged("79")) {
+ return false;
+ }
+ auto bundles = bundle_handler.getBundles();
+ assert(bundles.size() == 1);
+ // verify the bundle
+ auto bundle_stream = std::make_shared<minifi::io::BufferStream>();
+ bundle_stream->write(reinterpret_cast<const uint8_t*>(bundles[0].data()), bundles[0].length());
+ auto archive_provider = core::ClassLoader::getDefaultClassLoader().instantiate<minifi::io::ArchiveStreamProvider>(
+ "ArchiveStreamProvider", "ArchiveStreamProvider");
+ assert(archive_provider);
+ auto decompressor = archive_provider->createReadStream(bundle_stream);
+ assert(decompressor);
+ std::map<std::string, std::string> archive_content;
+ while (auto info = decompressor->nextEntry()) {
+ std::string file_content;
+ file_content.resize(info->size);
+ assert(decompressor->read(reinterpret_cast<uint8_t *>(file_content.data()), file_content.length()) ==
+ file_content.length());
+ archive_content[info->filename] = std::move(file_content);
+ }
+ assert(archive_content["minifi.properties"] == properties_file);
+ assert(archive_content["config.yml"] == flow_config_file);
+ auto log_gz = archive_content["minifi.log.gz"];
+ auto log_stream = std::make_shared<minifi::io::BufferStream>();
+ {
+ minifi::io::ZlibDecompressStream log_decompressor(gsl::make_not_null(log_stream.get()));
+ log_decompressor.write(reinterpret_cast<const uint8_t*>(log_gz.data()), log_gz.length());
+ }
+ std::string log_text;
+ log_text.resize(log_stream->size());
+ log_stream->read(reinterpret_cast<uint8_t*>(log_text.data()), log_text.length());
+ assert(log_text.find("Tis but a scratch") != std::string::npos);
+ return true;
+ });
+
+ harness.getConfiguration()->setHome(home_dir.string());
+ harness.getConfiguration()->loadConfigureFile("conf/minifi.properties");
+ harness.setUrl("http://localhost:0/heartbeat", &heartbeat_handler);
+ harness.setUrl("http://localhost:0/acknowledge", &ack_handler);
+ harness.setUrl("http://localhost:0/debug_bundle", &bundle_handler);
+ harness.setC2Url("/heartbeat", "/acknowledge");
+
+ heartbeat_handler.setC2RestResponse("http://localhost:" + harness.getWebPort() + "/debug_bundle");
+
+ logging::LoggerFactory<C2HeartbeatHandler>::getLogger()->log_error("Tis but a scratch");
+
+ harness.run((home_dir / "conf/config.yml").string());
+}
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp b/extensions/http-curl/tests/C2RequestClassTest.cpp
index ea0ca04..e49fce6 100644
--- a/extensions/http-curl/tests/C2RequestClassTest.cpp
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -27,31 +27,6 @@
#include "CivetStream.h"
#include "StreamPipe.h"
-class C2AcknowledgeHandler : public ServerAwareHandler {
- public:
- bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
- std::string req = readPayload(conn);
- rapidjson::Document root;
- root.Parse(req.data(), req.size());
- if (root.IsObject() && root.HasMember("operationId")) {
- std::lock_guard<std::mutex> guard(mtx_);
- acknowledged_operations_.insert(root["operationId"].GetString());
- }
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
- return true;
- }
-
- bool isAcknowledged(const std::string& operation_id) const {
- std::lock_guard<std::mutex> guard(mtx_);
- return acknowledged_operations_.count(operation_id) > 0;
- }
-
- private:
- mutable std::mutex mtx_;
- std::set<std::string> acknowledged_operations_;
-};
-
class C2HeartbeatHandler : public ServerAwareHandler {
public:
explicit C2HeartbeatHandler(std::string response) : response_(std::move(response)) {}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 02ca8ec..800cd37 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -96,3 +96,4 @@ add_test(NAME VerifyInvokeHTTPPostTest COMMAND VerifyInvokeHTTPPostTest "${TEST_
add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml" "${TEST_RESOURCES}/")
add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
+add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 55d1651..9cfff34 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -25,6 +25,7 @@
#include <string>
#include <utility>
#include <vector>
+#include <set>
#include "civetweb.h"
#include "CivetServer.h"
@@ -649,3 +650,28 @@ class RetryHttpGetResponder : public ServerAwareHandler {
return true;
}
};
+
+class C2AcknowledgeHandler : public ServerAwareHandler {
+ public:
+ bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
+ std::string req = readPayload(conn);
+ rapidjson::Document root;
+ root.Parse(req.data(), req.size());
+ if (root.IsObject() && root.HasMember("operationId")) {
+ std::lock_guard<std::mutex> guard(mtx_);
+ acknowledged_operations_.insert(root["operationId"].GetString());
+ }
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ return true;
+ }
+
+ bool isAcknowledged(const std::string& operation_id) const {
+ std::lock_guard<std::mutex> guard(mtx_);
+ return acknowledged_operations_.count(operation_id) > 0;
+ }
+
+ private:
+ mutable std::mutex mtx_;
+ std::set<std::string> acknowledged_operations_;
+};
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 48d8d87..d74e600 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -122,6 +122,7 @@ void HTTPIntegrationBase::setC2Url(const std::string &heartbeat_path, const std:
class VerifyC2Base : public HTTPIntegrationBase {
public:
+ using HTTPIntegrationBase::HTTPIntegrationBase;
void testSetup() override {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<LogTestController>();
diff --git a/extensions/libarchive/WriteArchiveStream.cpp b/extensions/libarchive/WriteArchiveStream.cpp
index af375bf..4c60233 100644
--- a/extensions/libarchive/WriteArchiveStream.cpp
+++ b/extensions/libarchive/WriteArchiveStream.cpp
@@ -126,6 +126,24 @@ size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
return result;
}
+bool WriteArchiveStreamImpl::finish() {
+ if (!arch_) {
+ return false;
+ }
+ arch_entry_.reset();
+ // closing the archive is needed to complete the archive
+ bool success = archive_write_close(arch_.get()) == ARCHIVE_OK;
+ if (!success) {
+ logger_->log_error("Archive write close error %s", archive_error_string(arch_.get()));
+ }
+ arch_.reset();
+ return success;
+}
+
+WriteArchiveStreamImpl::~WriteArchiveStreamImpl() {
+ finish();
+}
+
} // namespace org::apache::nifi::minifi::io
diff --git a/extensions/libarchive/WriteArchiveStream.h b/extensions/libarchive/WriteArchiveStream.h
index 12e175a..5316100 100644
--- a/extensions/libarchive/WriteArchiveStream.h
+++ b/extensions/libarchive/WriteArchiveStream.h
@@ -68,6 +68,13 @@ class WriteArchiveStreamImpl: public WriteArchiveStream {
size_t write(const uint8_t* data, size_t len) override;
+ // TODO(adebreceni):
+ // Stream::close does not make it possible to signal failure (short of throwing)
+ // investigate if it is worth making Stream::close capable of this
+ bool finish() override;
+
+ ~WriteArchiveStreamImpl() override;
+
private:
static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) {
auto* const output = static_cast<OutputStream*>(context);
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d3b039d..bbaa81f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -189,6 +189,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
std::vector<BackTrace> getTraces() override;
+ std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() override;
+
private:
/**
* Loads the flow as specified in the flow config file or if not present
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 5d0b6b5..f8d73e6 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -153,6 +153,10 @@ class C2Agent : public state::UpdateController {
*/
bool update_property(const std::string &property_name, const std::string &property_value, bool persist);
+ void handle_transfer(const C2ContentResponse &resp);
+
+ C2Payload bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files);
+
/**
* Creates configuration options C2 payload for response
*/
@@ -164,6 +168,10 @@ class C2Agent : public state::UpdateController {
bool handleConfigurationUpdate(const C2ContentResponse &resp);
+ std::optional<std::string> resolveFlowUrl(const std::string& url) const;
+
+ std::optional<std::string> resolveUrl(const std::string& url) const;
+
protected:
std::timed_mutex metrics_mutex_;
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index 8f63b2c..c1290b1 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -28,6 +28,7 @@
#include "../core/state/Value.h"
#include "core/state/UpdateController.h"
#include "utils/Enum.h"
+#include "io/InputStream.h"
namespace org {
namespace apache {
@@ -128,14 +129,17 @@ class C2Payload : public state::Update {
~C2Payload() override = default;
void setIdentifier(std::string ident) { ident_ = std::move(ident); }
+ [[nodiscard]]
std::string getIdentifier() const { return ident_; }
void setLabel(std::string label) { label_ = std::move(label); }
+ [[nodiscard]]
std::string getLabel() const { return label_; }
/**
* Gets the operation for this payload. May be nested or a single operation.
*/
+ [[nodiscard]]
Operation getOperation() const noexcept { return op_; }
/**
@@ -143,6 +147,7 @@ class C2Payload : public state::Update {
*/
bool validate() override { return true; }
+ [[nodiscard]]
const std::vector<C2ContentResponse> &getContent() const noexcept { return content_; }
/**
@@ -153,6 +158,7 @@ class C2Payload : public state::Update {
/**
* Determines if this object contains raw data.
*/
+ [[nodiscard]]
bool isRaw() const noexcept { return raw_; }
/**
@@ -165,6 +171,7 @@ class C2Payload : public state::Update {
/**
* Returns raw data.
*/
+ [[nodiscard]]
std::vector<char> getRawData() const { return raw_data_; }
/**
@@ -173,12 +180,15 @@ class C2Payload : public state::Update {
*/
void addPayload(C2Payload &&payload);
+ [[nodiscard]]
bool isCollapsible() const noexcept { return is_collapsible_; }
void setCollapsible(bool is_collapsible) noexcept { is_collapsible_ = is_collapsible; }
+ [[nodiscard]]
bool isContainer() const noexcept { return is_container_; }
void setContainer(bool is_container) noexcept { is_container_ = is_container; }
+ [[nodiscard]]
const std::vector<C2Payload> &getNestedPayloads() const & noexcept { return payloads_; }
std::vector<C2Payload>&& getNestedPayloads() && noexcept {return std::move(payloads_);}
diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h
index bbb7c24..2e8965a 100644
--- a/libminifi/include/core/logging/internal/LogCompressorSink.h
+++ b/libminifi/include/core/logging/internal/LogCompressorSink.h
@@ -63,7 +63,9 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
compress(true);
}
LogBuffer compressed;
- compressed_logs_.tryDequeue(compressed, time);
+ if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
+ return createEmptyArchive();
+ }
return std::move(compressed.buffer_);
}
@@ -73,6 +75,8 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
NothingToCompress
};
+ std::unique_ptr<io::InputStream> createEmptyArchive();
+
CompressionResult compress(bool force_rotation = false);
void run();
@@ -81,6 +85,8 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
utils::StagingQueue<LogBuffer> cached_logs_;
utils::StagingQueue<ActiveCompressor, ActiveCompressor::Allocator> compressed_logs_;
+
+ std::shared_ptr<logging::Logger> compressor_logger_;
};
} // namespace internal
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 3bfe9a5..a493e60 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -22,6 +22,7 @@
#include <utility>
#include <vector>
#include <string>
+#include <map>
#include "utils/ThreadPool.h"
#include "utils/BackTrace.h"
@@ -229,6 +230,7 @@ class StateMonitor : public StateController {
*/
virtual std::vector<BackTrace> getTraces() = 0;
+ virtual std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() = 0;
protected:
std::atomic<bool> controller_running_;
diff --git a/libminifi/include/io/ArchiveStream.h b/libminifi/include/io/ArchiveStream.h
index def8838..2f04f3e 100644
--- a/libminifi/include/io/ArchiveStream.h
+++ b/libminifi/include/io/ArchiveStream.h
@@ -36,6 +36,7 @@ struct EntryInfo {
class WriteArchiveStream : public OutputStream {
public:
virtual bool newEntry(const EntryInfo& info) = 0;
+ virtual bool finish() = 0;
};
class ReadArchiveStream : public InputStream {
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 6fb2463..96521c6 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -82,6 +82,10 @@ class BufferStream : public BaseStream {
return buffer_.data();
}
+ std::vector<uint8_t> moveBuffer() {
+ return std::exchange(buffer_, {});
+ }
+
/**
* Retrieve size of data stream
* @return size of data stream
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index cd84de6..d8a51fe 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -116,6 +116,8 @@ class Properties {
utils::ChecksumCalculator& getChecksumCalculator() { return checksum_calculator_; }
+ std::string getFilePath() const;
+
protected:
std::map<std::string, std::string> getProperties() const;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6cc1f8c..b87f819 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -51,6 +51,7 @@
#include "utils/HTTPClient.h"
#include "io/NetworkPrioritizer.h"
#include "io/validation.h"
+#include "io/FileStream.h"
namespace org {
namespace apache {
@@ -506,6 +507,19 @@ std::vector<BackTrace> FlowController::getTraces() {
return traces;
}
+std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
+ std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
+ if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
+ debug_info["minifi.log.gz"] = std::move(logs);
+ }
+ if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
+ debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
+ }
+ debug_info["minifi.properties"] = std::make_unique<io::FileStream>(configuration_->getFilePath(), 0, false);
+
+ return debug_info;
+}
+
} // namespace minifi
} // namespace nifi
} // namespace apache
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index f802f7b..60a78e8 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -42,6 +42,8 @@
#include "utils/Environment.h"
#include "utils/Monitors.h"
#include "utils/StringUtils.h"
+#include "io/ArchiveStream.h"
+#include "io/StreamPipe.h"
using namespace std::literals::chrono_literals;
@@ -317,6 +319,18 @@ void C2Agent::extractPayload(const C2Payload &resp) {
}
}
+namespace {
+
+struct C2TransferError : public std::runtime_error {
+ using runtime_error::runtime_error;
+};
+
+struct C2DebugBundleError : public C2TransferError {
+ using C2TransferError::C2TransferError;
+};
+
+} // namespace
+
void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
switch (resp.op.value()) {
case Operation::CLEAR:
@@ -409,6 +423,18 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
logger_->log_warn("Resume functionality is not supported!");
}
break;
+ case Operation::TRANSFER: {
+ try {
+ handle_transfer(resp);
+ C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
+ enqueue_c2_response(std::move(response));
+ } catch (const std::runtime_error& error) {
+ C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true);
+ response.setRawData(error.what());
+ enqueue_c2_response(std::move(response));
+ }
+ break;
+ }
default:
break;
// do nothing
@@ -586,6 +612,59 @@ bool C2Agent::update_property(const std::string &property_name, const std::strin
return configuration_->persistProperties();
}
+C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files) {
+ C2Payload payload(Operation::TRANSFER, false);
+ auto stream_provider = core::ClassLoader::getDefaultClassLoader().instantiate<io::ArchiveStreamProvider>(
+ "ArchiveStreamProvider", "ArchiveStreamProvider");
+ if (!stream_provider) {
+ throw C2DebugBundleError("Couldn't instantiate archiver provider");
+ }
+ auto bundle = std::make_shared<io::BufferStream>();
+ auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, logger_);
+ if (!archiver) {
+ throw C2DebugBundleError("Couldn't instantiate archiver");
+ }
+ for (auto&[filename, stream] : files) {
+ size_t file_size = stream->size();
+ if (!archiver->newEntry({filename, file_size})) {
+ throw C2DebugBundleError("Couldn't initialize archive entry for '" + filename + "'");
+ }
+ if (gsl::narrow<int64_t>(file_size) != internal::pipe(stream.get(), archiver.get())) {
+ // we have touched the input streams, they cannot be reused
+ throw C2DebugBundleError("Error while writing file '" + filename + "' into the debug bundle");
+ }
+ }
+ if (!archiver->finish()) {
+ throw C2DebugBundleError("Failed to complete debug bundle archive");
+ }
+ C2Payload file(Operation::TRANSFER, true);
+ file.setLabel("debug.tar.gz");
+ file.setRawData(bundle->moveBuffer());
+ payload.addPayload(std::move(file));
+ return payload;
+}
+
+void C2Agent::handle_transfer(const C2ContentResponse &resp) {
+ if (resp.name != "debug") {
+ throw C2TransferError("Unknown operand '" + resp.name + "'");
+ }
+ auto target_it = resp.operation_arguments.find("target");
+ if (target_it == resp.operation_arguments.end()) {
+ throw C2DebugBundleError("Missing argument for debug operation: 'target'");
+ }
+ std::optional<std::string> url = resolveUrl(target_it->second.to_string());
+ if (!url) {
+ throw C2DebugBundleError("Invalid url");
+ }
+ std::map<std::string, std::unique_ptr<io::InputStream>> files = update_sink_->getDebugInfo();
+
+ auto bundle = bundleDebugInfo(files);
+ C2Payload &&response = protocol_.load()->consumePayload(url.value(), bundle, TRANSMIT, false);
+ if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
+ throw C2DebugBundleError("Error while uploading");
+ }
+}
+
void C2Agent::restart_agent() {
std::string cwd = utils::Environment::getCurrentWorkingDirectory();
if (cwd.empty()) {
@@ -654,6 +733,46 @@ utils::TaskRescheduleInfo C2Agent::consume() {
return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
}
+std::optional<std::string> C2Agent::resolveFlowUrl(const std::string& url) const {
+ if (utils::StringUtils::startsWith(url, "http")) {
+ return url;
+ }
+ std::string base;
+ if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+ base = utils::StringUtils::trim(base);
+ if (!utils::StringUtils::endsWith(base, "/")) {
+ base += "/";
+ }
+ base += url;
+ return base;
+ } else if (configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
+ utils::URL base_url{utils::StringUtils::trim(base)};
+ if (base_url.isValid()) {
+ return base_url.hostPort() + "/c2/api/" + url;
+ }
+ logger_->log_error("Could not parse C2 REST URL '%s'", base);
+ return std::nullopt;
+ }
+ return url;
+}
+
+std::optional<std::string> C2Agent::resolveUrl(const std::string& url) const {
+ if (!utils::StringUtils::startsWith(url, "/")) {
+ return url;
+ }
+ std::string base;
+ if (!configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
+ logger_->log_error("Missing C2 REST URL");
+ return std::nullopt;
+ }
+ utils::URL base_url{utils::StringUtils::trim(base)};
+ if (base_url.isValid()) {
+ return base_url.hostPort() + url;
+ }
+ logger_->log_error("Could not parse C2 REST URL '%s'", base);
+ return std::nullopt;
+}
+
std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() == nullptr) {
// try to open the file
@@ -667,30 +786,13 @@ std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
return {};
}
- std::string resolved_url = uri;
- if (!utils::StringUtils::startsWith(uri, "http")) {
- std::stringstream adjusted_url;
- std::string base;
- if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
- base = utils::StringUtils::trim(base);
- adjusted_url << base;
- if (!utils::StringUtils::endsWith(base, "/")) {
- adjusted_url << "/";
- }
- adjusted_url << uri;
- resolved_url = adjusted_url.str();
- } else if (configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
- utils::URL base_url{utils::StringUtils::trim(base)};
- if (!base_url.isValid()) {
- logger_->log_error("Could not parse C2 REST URL '%s'", base);
- return std::nullopt;
- }
- resolved_url = base_url.hostPort() + "/c2/api/" + uri;
- }
+ std::optional<std::string> resolved_url = resolveFlowUrl(uri);
+ if (!resolved_url) {
+ return std::nullopt;
}
C2Payload payload(Operation::TRANSFER, true);
- C2Payload &&response = protocol_.load()->consumePayload(resolved_url, payload, RECEIVE, false);
+ C2Payload &&response = protocol_.load()->consumePayload(resolved_url.value(), payload, RECEIVE, false);
auto raw_data = response.getRawData();
return std::string(raw_data.data(), raw_data.size());
diff --git a/libminifi/src/core/logging/internal/LogCompressorSink.cpp b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
index 112e6ae..9e6dcab 100644
--- a/libminifi/src/core/logging/internal/LogCompressorSink.cpp
+++ b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
@@ -29,7 +29,8 @@ namespace internal {
LogCompressorSink::LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger)
: cached_logs_(cache_size.max_total_size, cache_size.max_segment_size),
- compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{std::move(logger)}) {
+ compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{logger}),
+ compressor_logger_(logger) {
compression_thread_ = std::thread{&LogCompressorSink::run, this};
}
@@ -39,8 +40,10 @@ LogCompressorSink::~LogCompressorSink() {
}
void LogCompressorSink::sink_it_(const spdlog::details::log_msg &msg) {
+ spdlog::memory_buf_t formatted;
+ base_sink<spdlog::details::null_mutex>::formatter_->format(msg, formatted);
cached_logs_.modify([&] (LogBuffer& active) {
- active.buffer_->write(reinterpret_cast<const uint8_t*>(msg.payload.data()), msg.payload.size());
+ active.buffer_->write(reinterpret_cast<const uint8_t*>(formatted.data()), formatted.size());
});
}
@@ -72,6 +75,11 @@ LogCompressorSink::CompressionResult LogCompressorSink::compress(bool force_rota
void LogCompressorSink::flush_() {}
+std::unique_ptr<io::InputStream> LogCompressorSink::createEmptyArchive() {
+ auto compressor = ActiveCompressor::Allocator(compressor_logger_)(0);
+ return std::move(compressor.commit().buffer_);
+}
+
} // namespace internal
} // namespace logging
} // namespace core
diff --git a/libminifi/src/properties/Properties.cpp b/libminifi/src/properties/Properties.cpp
index 02b2378..8df8202 100644
--- a/libminifi/src/properties/Properties.cpp
+++ b/libminifi/src/properties/Properties.cpp
@@ -97,6 +97,11 @@ void Properties::loadConfigureFile(const char *fileName) {
dirty_ = false;
}
+std::string Properties::getFilePath() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return properties_file_;
+}
+
bool Properties::persistProperties() {
std::lock_guard<std::mutex> lock(mutex_);
if (!dirty_) {
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index c64e779..2626cd9 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -105,6 +105,9 @@ size_t ByteOutputCallback::readFully(char *buffer, size_t size) {
}
size_t ByteOutputCallback::read_current_str(char *buffer, size_t size) {
+ if (size == 0) {
+ return 0;
+ }
size_t amount_to_read = size;
size_t curr_buf_pos = 0;
/**
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index e4cf792..e255cea 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -45,7 +45,7 @@ class IntegrationBase {
virtual ~IntegrationBase() = default;
- virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& bootstrap_file = {});
+ virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& home_path = {});
void setKeyDir(const std::string& key_dir) {
this->key_dir = key_dir;
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index de14f60..e0bf4e2 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -143,6 +143,10 @@ class TestUpdateSink : public minifi::state::StateMonitor {
return 0;
}
+ std::map<std::string, std::unique_ptr<minifi::io::InputStream>> getDebugInfo() override {
+ return {};
+ }
+
/**
* Clear connection for the agent.
*/
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index a2ff302..df22a4a 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -141,7 +141,7 @@ TEST_CASE("Test Compression", "[ttl7]") {
std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
REQUIRE(compressed_log);
auto logs = decompress(compressed_log);
- REQUIRE(logs == "Hi there");
+ REQUIRE(logs.find("Hi there") != std::string::npos);
}
class LoggerTestAccessor {