You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/01/19 17:32:13 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1684 - Provide agent logs through the c2 protocol

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c0a73f  MINIFICPP-1684 - Provide agent logs through the c2 protocol
9c0a73f is described below

commit 9c0a73fc6bf2403b262b6e548f3dfb30e7613a1d
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Thu Nov 18 11:42:35 2021 +0100

    MINIFICPP-1684 - Provide agent logs through the c2 protocol
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1233
---
 extensions/http-curl/client/HTTPClient.cpp         |  21 +++
 extensions/http-curl/client/HTTPClient.h           |   9 +-
 extensions/http-curl/protocols/RESTSender.cpp      |  49 +++--
 extensions/http-curl/protocols/RESTSender.h        |   2 +-
 extensions/http-curl/tests/C2DebugBundleTest.cpp   | 206 +++++++++++++++++++++
 extensions/http-curl/tests/C2RequestClassTest.cpp  |  25 ---
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          |  26 +++
 extensions/http-curl/tests/HTTPIntegrationBase.h   |   1 +
 extensions/libarchive/WriteArchiveStream.cpp       |  18 ++
 extensions/libarchive/WriteArchiveStream.h         |   7 +
 libminifi/include/FlowController.h                 |   2 +
 libminifi/include/c2/C2Agent.h                     |   8 +
 libminifi/include/c2/C2Payload.h                   |  10 +
 .../core/logging/internal/LogCompressorSink.h      |   8 +-
 libminifi/include/core/state/UpdateController.h    |   2 +
 libminifi/include/io/ArchiveStream.h               |   1 +
 libminifi/include/io/BufferStream.h                |   4 +
 libminifi/include/properties/Properties.h          |   2 +
 libminifi/src/FlowController.cpp                   |  14 ++
 libminifi/src/c2/C2Agent.cpp                       | 144 +++++++++++---
 .../core/logging/internal/LogCompressorSink.cpp    |  12 +-
 libminifi/src/properties/Properties.cpp            |   5 +
 libminifi/src/utils/ByteArrayCallback.cpp          |   3 +
 libminifi/test/integration/IntegrationBase.h       |   2 +-
 libminifi/test/unit/ControllerTests.cpp            |   4 +
 libminifi/test/unit/LoggerTests.cpp                |   2 +-
 27 files changed, 521 insertions(+), 67 deletions(-)

diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index ae2da1a..a63c6bb 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -53,6 +53,20 @@ HTTPClient::HTTPClient()
   http_session_ = curl_easy_init();
 }
 
+void HTTPClient::addFormPart(const std::string& content_type, const std::string& name, HTTPUploadCallback* read_callback, const std::optional<std::string>& filename) {
+  if (!form_) {
+    form_ = curl_mime_init(http_session_);
+  }
+  curl_mimepart* part = curl_mime_addpart(form_);
+  curl_mime_type(part, content_type.c_str());
+  if (filename) {
+    curl_mime_filename(part, filename->c_str());
+  }
+  curl_mime_name(part, name.c_str());
+  curl_mime_data_cb(part, read_callback->ptr->getBufferSize(),
+      &utils::HTTPRequestResponse::send_write, nullptr, nullptr, static_cast<void*>(read_callback));
+}
+
 HTTPClient::~HTTPClient() {
   if (nullptr != headers_) {
     curl_slist_free_all(headers_);
@@ -62,6 +76,10 @@ HTTPClient::~HTTPClient() {
     curl_easy_cleanup(http_session_);
     http_session_ = nullptr;
   }
+  if (form_ != nullptr) {
+    curl_mime_free(form_);
+    form_ = nullptr;
+  }
   // forceClose ended up not being the issue in MINIFICPP-667, but leaving here
   // out of good hygiene.
   forceClose();
@@ -290,6 +308,9 @@ bool HTTPClient::submit() {
     logger_->log_debug("Not using keep alive");
     curl_easy_setopt(http_session_, CURLOPT_TCP_KEEPALIVE, 0L);
   }
+  if (form_ != nullptr) {
+    curl_easy_setopt(http_session_, CURLOPT_MIMEPOST, form_);
+  }
   res = curl_easy_perform(http_session_);
   if (callback == nullptr) {
     read_callback_.close();
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index d7d3a40..05a24e1 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -74,6 +74,10 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   HTTPClient(const std::string& name, const utils::Identifier& uuid);
 
+  // class uses raw pointers
+  HTTPClient(const HTTPClient&) = delete;
+  HTTPClient& operator=(const HTTPClient&) = delete;
+
   explicit HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
 
   ~HTTPClient();
@@ -82,6 +86,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   void setVerbose(bool use_stderr = false) override;
 
+  void addFormPart(const std::string& content_type, const std::string& name, HTTPUploadCallback* read_callback, const std::optional<std::string>& filename = std::nullopt);
+
   void forceClose();
 
   void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) override;
@@ -285,7 +291,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   CURLcode res{CURLE_OK};
 
-  CURL *http_session_;
+  CURL* http_session_{nullptr};
+  curl_mime* form_{nullptr};
 
   std::string method_;
 
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 7b408e4..4217327 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -22,6 +22,7 @@
 #include <memory>
 #include <string>
 #include <vector>
+#include <utility>
 #include <limits>
 #include "utils/file/FileUtils.h"
 #include "core/Resource.h"
@@ -55,12 +56,13 @@ void RESTSender::initialize(core::controller::ControllerServiceProvider* control
 }
 
 C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool /*async*/) {
-  std::string outputConfig;
+  std::optional<std::string> data;
 
-  if (direction == Direction::TRANSMIT) {
-    outputConfig = serializeJsonRootPayload(payload);
+  if (direction == Direction::TRANSMIT && payload.getOperation() != Operation::TRANSFER) {
+    // treat payload as json
+    data = serializeJsonRootPayload(payload);
   }
-  return sendPayload(url, direction, payload, outputConfig);
+  return sendPayload(url, direction, payload, std::move(data));
 }
 
 C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) {
@@ -83,14 +85,14 @@ void RESTSender::setSecurityContext(utils::HTTPClient &client, const std::string
   client.initialize(type, url, generatedService);
 }
 
-const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) {
+C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, std::optional<std::string> data) {
   if (url.empty()) {
     return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR);
   }
 
   // Callback for transmit. Declared in order to destruct in proper order - take care!
-  std::unique_ptr<utils::ByteInputCallBack> input = nullptr;
-  std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr;
+  std::vector<std::unique_ptr<utils::ByteInputCallBack>> inputs;
+  std::vector<std::unique_ptr<utils::HTTPUploadCallback>> callbacks;
 
   // Callback for transfer. Declared in order to destruct in proper order - take care!
   std::unique_ptr<utils::ByteOutputCallback> file_callback = nullptr;
@@ -102,17 +104,36 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d
   client.setKeepAliveIdle(std::chrono::milliseconds(2000));
   client.setConnectionTimeout(std::chrono::milliseconds(2000));
   if (direction == Direction::TRANSMIT) {
-    input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
-    callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback());
-    input->write(outputConfig);
-    callback->ptr = input.get();
-    callback->pos = 0;
     client.set_request_method("POST");
     if (!ssl_context_service_ && url.find("https://") == 0) {
       setSecurityContext(client, "POST", url);
     }
-    client.setUploadCallback(callback.get());
-    client.setPostSize(outputConfig.size());
+    if (payload.getOperation() == Operation::TRANSFER) {
+      // treat nested payloads as files
+      for (const auto& file : payload.getNestedPayloads()) {
+        std::string filename = file.getLabel();
+        if (filename.empty()) {
+          throw std::logic_error("Missing filename");
+        }
+        auto file_input = std::make_unique<utils::ByteInputCallBack>();
+        auto file_cb = std::make_unique<utils::HTTPUploadCallback>();
+        auto file_data = file.getRawData();
+        file_input->write(std::string{file_data.begin(), file_data.end()});
+        file_cb->ptr = file_input.get();
+        client.addFormPart("application/octet-stream", "file", file_cb.get(), filename);
+        inputs.push_back(std::move(file_input));
+        callbacks.push_back(std::move(file_cb));
+      }
+    } else {
+      auto data_input = std::make_unique<utils::ByteInputCallBack>();
+      auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+      data_input->write(data.value_or(""));
+      data_cb->ptr = data_input.get();
+      client.setUploadCallback(data_cb.get());
+      client.setPostSize(data ? data->size() : 0);
+      inputs.push_back(std::move(data_input));
+      callbacks.push_back(std::move(data_cb));
+    }
   } else {
     // we do not need to set the upload callback
     // since we are not uploading anything on a get
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 2df138f..c7a8c86 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -52,7 +52,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
   void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
 
  protected:
-  virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig);
+  C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, std::optional<std::string> data);
 
   /**
    * Initializes the SSLContextService onto the HTTP client if one is needed
diff --git a/extensions/http-curl/tests/C2DebugBundleTest.cpp b/extensions/http-curl/tests/C2DebugBundleTest.cpp
new file mode 100644
index 0000000..9cf7be1
--- /dev/null
+++ b/extensions/http-curl/tests/C2DebugBundleTest.cpp
@@ -0,0 +1,206 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "TestBase.h"
+
+#include "c2/C2Agent.h"
+#include "protocols/RESTProtocol.h"
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "io/ArchiveStream.h"
+
+using std::literals::chrono_literals::operator""s;
+
+class VerifyDebugInfo : public VerifyC2Base {
+ public:
+  explicit VerifyDebugInfo(std::function<bool()> verify): verify_(std::move(verify)) {}
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(300), verify_));
+  }
+
+  void configureC2() override {
+    VerifyC2Base::configureC2();
+    configuration->set("nifi.c2.agent.heartbeat.period", "100");
+  }
+
+  std::function<bool()> verify_;
+};
+
+class C2DebugBundleHandler : public ServerAwareHandler {
+ public:
+  bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
+    std::optional<std::string> file_content;
+    mg_form_data_handler form_handler;
+    form_handler.field_found = field_found;
+    form_handler.field_get = field_get;
+    form_handler.user_data = &file_content;
+    mg_handle_form_request(conn, &form_handler);
+    assert(file_content);
+    {
+      std::lock_guard<std::mutex> lock(mtx_);
+      bundles_.push_back(std::move(*file_content));
+    }
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                      "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
+
+  std::vector<std::string> getBundles() {
+    std::lock_guard<std::mutex> lock(mtx_);
+    return bundles_;
+  }
+
+ private:
+  static int field_found(const char* key, const char* filename, char* /*path*/, size_t /*pathlen*/, void* user_data) {
+    auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
+    if (!filename || std::string(filename) != "debug.tar.gz") {
+      throw std::runtime_error("Unknown form entry: " + std::string{key});
+    }
+    if (file_content) {
+      throw std::runtime_error("Debug archive has already been extracted: " + std::string{key});
+    }
+    return MG_FORM_FIELD_STORAGE_GET;
+  }
+  static int field_get(const char* /*key*/, const char* value, size_t valuelen, void* user_data) {
+    auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
+    file_content = "";
+    (*file_content) += std::string(value, valuelen);
+    return MG_FORM_FIELD_HANDLE_GET;
+  }
+
+  std::mutex mtx_;
+  std::vector<std::string> bundles_;
+};
+
+class C2HeartbeatHandler : public ServerAwareHandler {
+ public:
+  bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
+    if (response_) {
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                      "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                response_->length());
+      mg_printf(conn, "%s", response_->c_str());
+      response_.reset();
+    } else {
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                      "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    }
+
+    return true;
+  }
+
+  void setC2RestResponse(const std::string& url) {
+    response_ =
+      R"({
+        "operation" : "heartbeat",
+        "requested_operations": [{
+          "operation" : "transfer",
+          "operationid" : "79",
+          "name": "debug",
+          "args": {"target": ")" + url + R"("}
+        }]
+      })";
+  }
+
+ private:
+  std::optional<std::string> response_;
+};
+
+static std::string properties_file = "some.dummy.content = here\n";
+static std::string flow_config_file = R"(
+  Flow Controller:
+    name: Banana Bread
+  Processors: []
+  Connections: []
+  Remote Processing Groups: []
+  Provenance Reporting:
+)";
+
+int main() {
+  TestController controller;
+
+  C2HeartbeatHandler heartbeat_handler;
+  C2AcknowledgeHandler ack_handler;
+  C2DebugBundleHandler bundle_handler;
+
+  std::filesystem::path home_dir = controller.createTempDirectory();
+  utils::file::PathUtils::create_dir((home_dir / "conf").string());
+  std::ofstream{home_dir / "conf/minifi.properties", std::ios::binary} << properties_file;
+  std::ofstream{home_dir / "conf/config.yml", std::ios::binary} << flow_config_file;
+
+  VerifyDebugInfo harness([&]() -> bool {
+    if (!ack_handler.isAcknowledged("79")) {
+      return false;
+    }
+    auto bundles = bundle_handler.getBundles();
+    assert(bundles.size() == 1);
+    // verify the bundle
+    auto bundle_stream = std::make_shared<minifi::io::BufferStream>();
+    bundle_stream->write(reinterpret_cast<const uint8_t*>(bundles[0].data()), bundles[0].length());
+    auto archive_provider = core::ClassLoader::getDefaultClassLoader().instantiate<minifi::io::ArchiveStreamProvider>(
+        "ArchiveStreamProvider", "ArchiveStreamProvider");
+    assert(archive_provider);
+    auto decompressor = archive_provider->createReadStream(bundle_stream);
+    assert(decompressor);
+    std::map<std::string, std::string> archive_content;
+    while (auto info = decompressor->nextEntry()) {
+      std::string file_content;
+      file_content.resize(info->size);
+      assert(decompressor->read(reinterpret_cast<uint8_t *>(file_content.data()), file_content.length()) ==
+              file_content.length());
+      archive_content[info->filename] = std::move(file_content);
+    }
+    assert(archive_content["minifi.properties"] == properties_file);
+    assert(archive_content["config.yml"] == flow_config_file);
+    auto log_gz = archive_content["minifi.log.gz"];
+    auto log_stream = std::make_shared<minifi::io::BufferStream>();
+    {
+      minifi::io::ZlibDecompressStream log_decompressor(gsl::make_not_null(log_stream.get()));
+      log_decompressor.write(reinterpret_cast<const uint8_t*>(log_gz.data()), log_gz.length());
+    }
+    std::string log_text;
+    log_text.resize(log_stream->size());
+    log_stream->read(reinterpret_cast<uint8_t*>(log_text.data()), log_text.length());
+    assert(log_text.find("Tis but a scratch") != std::string::npos);
+    return true;
+  });
+
+  harness.getConfiguration()->setHome(home_dir.string());
+  harness.getConfiguration()->loadConfigureFile("conf/minifi.properties");
+  harness.setUrl("http://localhost:0/heartbeat", &heartbeat_handler);
+  harness.setUrl("http://localhost:0/acknowledge", &ack_handler);
+  harness.setUrl("http://localhost:0/debug_bundle", &bundle_handler);
+  harness.setC2Url("/heartbeat", "/acknowledge");
+
+  heartbeat_handler.setC2RestResponse("http://localhost:" + harness.getWebPort() + "/debug_bundle");
+
+  logging::LoggerFactory<C2HeartbeatHandler>::getLogger()->log_error("Tis but a scratch");
+
+  harness.run((home_dir / "conf/config.yml").string());
+}
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp b/extensions/http-curl/tests/C2RequestClassTest.cpp
index ea0ca04..e49fce6 100644
--- a/extensions/http-curl/tests/C2RequestClassTest.cpp
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -27,31 +27,6 @@
 #include "CivetStream.h"
 #include "StreamPipe.h"
 
-class C2AcknowledgeHandler : public ServerAwareHandler {
- public:
-  bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
-    std::string req = readPayload(conn);
-    rapidjson::Document root;
-    root.Parse(req.data(), req.size());
-    if (root.IsObject() && root.HasMember("operationId")) {
-      std::lock_guard<std::mutex> guard(mtx_);
-      acknowledged_operations_.insert(root["operationId"].GetString());
-    }
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                    "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
-    return true;
-  }
-
-  bool isAcknowledged(const std::string& operation_id) const {
-    std::lock_guard<std::mutex> guard(mtx_);
-    return acknowledged_operations_.count(operation_id) > 0;
-  }
-
- private:
-  mutable std::mutex mtx_;
-  std::set<std::string> acknowledged_operations_;
-};
-
 class C2HeartbeatHandler : public ServerAwareHandler {
  public:
   explicit C2HeartbeatHandler(std::string response) : response_(std::move(response)) {}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 02ca8ec..800cd37 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -96,3 +96,4 @@ add_test(NAME VerifyInvokeHTTPPostTest COMMAND VerifyInvokeHTTPPostTest "${TEST_
 add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
 add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
+add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 55d1651..9cfff34 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <set>
 
 #include "civetweb.h"
 #include "CivetServer.h"
@@ -649,3 +650,28 @@ class RetryHttpGetResponder : public ServerAwareHandler {
     return true;
   }
 };
+
+class C2AcknowledgeHandler : public ServerAwareHandler {
+ public:
+  bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override {
+    std::string req = readPayload(conn);
+    rapidjson::Document root;
+    root.Parse(req.data(), req.size());
+    if (root.IsObject() && root.HasMember("operationId")) {
+      std::lock_guard<std::mutex> guard(mtx_);
+      acknowledged_operations_.insert(root["operationId"].GetString());
+    }
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
+
+  bool isAcknowledged(const std::string& operation_id) const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return acknowledged_operations_.count(operation_id) > 0;
+  }
+
+ private:
+  mutable std::mutex mtx_;
+  std::set<std::string> acknowledged_operations_;
+};
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 48d8d87..d74e600 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -122,6 +122,7 @@ void HTTPIntegrationBase::setC2Url(const std::string &heartbeat_path, const std:
 
 class VerifyC2Base : public HTTPIntegrationBase {
  public:
+  using HTTPIntegrationBase::HTTPIntegrationBase;
   void testSetup() override {
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<LogTestController>();
diff --git a/extensions/libarchive/WriteArchiveStream.cpp b/extensions/libarchive/WriteArchiveStream.cpp
index af375bf..4c60233 100644
--- a/extensions/libarchive/WriteArchiveStream.cpp
+++ b/extensions/libarchive/WriteArchiveStream.cpp
@@ -126,6 +126,24 @@ size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
   return result;
 }
 
+bool WriteArchiveStreamImpl::finish() {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_.reset();
+  // closing the archive is needed to complete the archive
+  bool success = archive_write_close(arch_.get()) == ARCHIVE_OK;
+  if (!success) {
+    logger_->log_error("Archive write close error %s", archive_error_string(arch_.get()));
+  }
+  arch_.reset();
+  return success;
+}
+
+WriteArchiveStreamImpl::~WriteArchiveStreamImpl() {
+  finish();
+}
+
 }  // namespace org::apache::nifi::minifi::io
 
 
diff --git a/extensions/libarchive/WriteArchiveStream.h b/extensions/libarchive/WriteArchiveStream.h
index 12e175a..5316100 100644
--- a/extensions/libarchive/WriteArchiveStream.h
+++ b/extensions/libarchive/WriteArchiveStream.h
@@ -68,6 +68,13 @@ class WriteArchiveStreamImpl: public WriteArchiveStream {
 
   size_t write(const uint8_t* data, size_t len) override;
 
+  // TODO(adebreceni):
+  //    Stream::close does not make it possible to signal failure (short of throwing)
+  //    investigate if it is worth making Stream::close capable of this
+  bool finish() override;
+
+  ~WriteArchiveStreamImpl() override;
+
  private:
   static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) {
     auto* const output = static_cast<OutputStream*>(context);
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index d3b039d..bbaa81f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -189,6 +189,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
 
   std::vector<BackTrace> getTraces() override;
 
+  std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() override;
+
  private:
   /**
    * Loads the flow as specified in the flow config file or if not present
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 5d0b6b5..f8d73e6 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -153,6 +153,10 @@ class C2Agent : public state::UpdateController {
    */
   bool update_property(const std::string &property_name, const std::string &property_value,  bool persist);
 
+  void handle_transfer(const C2ContentResponse &resp);
+
+  C2Payload bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files);
+
   /**
    * Creates configuration options C2 payload for response
    */
@@ -164,6 +168,10 @@ class C2Agent : public state::UpdateController {
 
   bool handleConfigurationUpdate(const C2ContentResponse &resp);
 
+  std::optional<std::string> resolveFlowUrl(const std::string& url) const;
+
+  std::optional<std::string> resolveUrl(const std::string& url) const;
+
  protected:
   std::timed_mutex metrics_mutex_;
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index 8f63b2c..c1290b1 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -28,6 +28,7 @@
 #include "../core/state/Value.h"
 #include "core/state/UpdateController.h"
 #include "utils/Enum.h"
+#include "io/InputStream.h"
 
 namespace org {
 namespace apache {
@@ -128,14 +129,17 @@ class C2Payload : public state::Update {
   ~C2Payload() override = default;
 
   void setIdentifier(std::string ident) { ident_ = std::move(ident); }
+  [[nodiscard]]
   std::string getIdentifier() const { return ident_; }
 
   void setLabel(std::string label) { label_ = std::move(label); }
+  [[nodiscard]]
   std::string getLabel() const { return label_; }
 
   /**
    * Gets the operation for this payload. May be nested or a single operation.
    */
+  [[nodiscard]]
   Operation getOperation() const noexcept { return op_; }
 
   /**
@@ -143,6 +147,7 @@ class C2Payload : public state::Update {
    */
   bool validate() override { return true; }
 
+  [[nodiscard]]
   const std::vector<C2ContentResponse> &getContent() const noexcept { return content_; }
 
   /**
@@ -153,6 +158,7 @@ class C2Payload : public state::Update {
   /**
    * Determines if this object contains raw data.
    */
+  [[nodiscard]]
   bool isRaw() const noexcept { return raw_; }
 
   /**
@@ -165,6 +171,7 @@ class C2Payload : public state::Update {
   /**
    * Returns raw data.
    */
+  [[nodiscard]]
   std::vector<char> getRawData() const { return raw_data_; }
 
   /**
@@ -173,12 +180,15 @@ class C2Payload : public state::Update {
    */
   void addPayload(C2Payload &&payload);
 
+  [[nodiscard]]
   bool isCollapsible() const noexcept { return is_collapsible_; }
   void setCollapsible(bool is_collapsible) noexcept { is_collapsible_ = is_collapsible; }
 
+  [[nodiscard]]
   bool isContainer() const noexcept { return is_container_; }
   void setContainer(bool is_container) noexcept { is_container_ = is_container; }
 
+  [[nodiscard]]
   const std::vector<C2Payload> &getNestedPayloads() const & noexcept { return payloads_; }
 
   std::vector<C2Payload>&& getNestedPayloads() && noexcept {return std::move(payloads_);}
diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h
index bbb7c24..2e8965a 100644
--- a/libminifi/include/core/logging/internal/LogCompressorSink.h
+++ b/libminifi/include/core/logging/internal/LogCompressorSink.h
@@ -63,7 +63,9 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
       compress(true);
     }
     LogBuffer compressed;
-    compressed_logs_.tryDequeue(compressed, time);
+    if (!compressed_logs_.tryDequeue(compressed, time) && flush) {
+      return createEmptyArchive();
+    }
     return std::move(compressed.buffer_);
   }
 
@@ -73,6 +75,8 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
     NothingToCompress
   };
 
+  std::unique_ptr<io::InputStream> createEmptyArchive();
+
   CompressionResult compress(bool force_rotation = false);
   void run();
 
@@ -81,6 +85,8 @@ class LogCompressorSink : public spdlog::sinks::base_sink<spdlog::details::null_
 
   utils::StagingQueue<LogBuffer> cached_logs_;
   utils::StagingQueue<ActiveCompressor, ActiveCompressor::Allocator> compressed_logs_;
+
+  std::shared_ptr<logging::Logger> compressor_logger_;
 };
 
 }  // namespace internal
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 3bfe9a5..a493e60 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 #include <string>
+#include <map>
 #include "utils/ThreadPool.h"
 #include "utils/BackTrace.h"
 
@@ -229,6 +230,7 @@ class StateMonitor : public StateController {
    */
   virtual std::vector<BackTrace> getTraces() = 0;
 
+  virtual std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() = 0;
 
  protected:
   std::atomic<bool> controller_running_;
diff --git a/libminifi/include/io/ArchiveStream.h b/libminifi/include/io/ArchiveStream.h
index def8838..2f04f3e 100644
--- a/libminifi/include/io/ArchiveStream.h
+++ b/libminifi/include/io/ArchiveStream.h
@@ -36,6 +36,7 @@ struct EntryInfo {
 class WriteArchiveStream : public OutputStream {
  public:
   virtual bool newEntry(const EntryInfo& info) = 0;
+  virtual bool finish() = 0;
 };
 
 class ReadArchiveStream : public InputStream {
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 6fb2463..96521c6 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -82,6 +82,10 @@ class BufferStream : public BaseStream {
     return buffer_.data();
   }
 
+  std::vector<uint8_t> moveBuffer() {
+    return std::exchange(buffer_, {});
+  }
+
   /**
    * Retrieve size of data stream
    * @return size of data stream
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index cd84de6..d8a51fe 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -116,6 +116,8 @@ class Properties {
 
   utils::ChecksumCalculator& getChecksumCalculator() { return checksum_calculator_; }
 
+  std::string getFilePath() const;
+
  protected:
   std::map<std::string, std::string> getProperties() const;
 
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 6cc1f8c..b87f819 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -51,6 +51,7 @@
 #include "utils/HTTPClient.h"
 #include "io/NetworkPrioritizer.h"
 #include "io/validation.h"
+#include "io/FileStream.h"
 
 namespace org {
 namespace apache {
@@ -506,6 +507,19 @@ std::vector<BackTrace> FlowController::getTraces() {
   return traces;
 }
 
+std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
+  std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
+  if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
+    debug_info["minifi.log.gz"] = std::move(logs);
+  }
+  if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
+    debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
+  }
+  debug_info["minifi.properties"] = std::make_unique<io::FileStream>(configuration_->getFilePath(), 0, false);
+
+  return debug_info;
+}
+
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index f802f7b..60a78e8 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -42,6 +42,8 @@
 #include "utils/Environment.h"
 #include "utils/Monitors.h"
 #include "utils/StringUtils.h"
+#include "io/ArchiveStream.h"
+#include "io/StreamPipe.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -317,6 +319,18 @@ void C2Agent::extractPayload(const C2Payload &resp) {
   }
 }
 
+namespace {
+
+struct C2TransferError : public std::runtime_error {
+  using runtime_error::runtime_error;
+};
+
+struct C2DebugBundleError : public C2TransferError {
+  using C2TransferError::C2TransferError;
+};
+
+}  // namespace
+
 void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
   switch (resp.op.value()) {
     case Operation::CLEAR:
@@ -409,6 +423,18 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
         logger_->log_warn("Resume functionality is not supported!");
       }
       break;
+    case Operation::TRANSFER: {
+      try {
+        handle_transfer(resp);
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
+        enqueue_c2_response(std::move(response));
+      } catch (const std::runtime_error& error) {
+        C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true);
+        response.setRawData(error.what());
+        enqueue_c2_response(std::move(response));
+      }
+      break;
+    }
     default:
       break;
       // do nothing
@@ -586,6 +612,59 @@ bool C2Agent::update_property(const std::string &property_name, const std::strin
   return configuration_->persistProperties();
 }
 
+C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files) {
+  C2Payload payload(Operation::TRANSFER, false);
+  auto stream_provider = core::ClassLoader::getDefaultClassLoader().instantiate<io::ArchiveStreamProvider>(
+      "ArchiveStreamProvider", "ArchiveStreamProvider");
+  if (!stream_provider) {
+    throw C2DebugBundleError("Couldn't instantiate archiver provider");
+  }
+  auto bundle = std::make_shared<io::BufferStream>();
+  auto archiver = stream_provider->createWriteStream(9, "gzip", bundle, logger_);
+  if (!archiver) {
+    throw C2DebugBundleError("Couldn't instantiate archiver");
+  }
+  for (auto&[filename, stream] : files) {
+    size_t file_size = stream->size();
+    if (!archiver->newEntry({filename, file_size})) {
+      throw C2DebugBundleError("Couldn't initialize archive entry for '" + filename + "'");
+    }
+    if (gsl::narrow<int64_t>(file_size) != internal::pipe(stream.get(), archiver.get())) {
+      // we have touched the input streams, they cannot be reused
+      throw C2DebugBundleError("Error while writing file '" + filename + "' into the debug bundle");
+    }
+  }
+  if (!archiver->finish()) {
+    throw C2DebugBundleError("Failed to complete debug bundle archive");
+  }
+  C2Payload file(Operation::TRANSFER, true);
+  file.setLabel("debug.tar.gz");
+  file.setRawData(bundle->moveBuffer());
+  payload.addPayload(std::move(file));
+  return payload;
+}
+
+void C2Agent::handle_transfer(const C2ContentResponse &resp) {
+  if (resp.name != "debug") {
+    throw C2TransferError("Unknown operand '" + resp.name + "'");
+  }
+  auto target_it = resp.operation_arguments.find("target");
+  if (target_it == resp.operation_arguments.end()) {
+    throw C2DebugBundleError("Missing argument for debug operation: 'target'");
+  }
+  std::optional<std::string> url = resolveUrl(target_it->second.to_string());
+  if (!url) {
+    throw C2DebugBundleError("Invalid url");
+  }
+  std::map<std::string, std::unique_ptr<io::InputStream>> files = update_sink_->getDebugInfo();
+
+  auto bundle = bundleDebugInfo(files);
+  C2Payload &&response = protocol_.load()->consumePayload(url.value(), bundle, TRANSMIT, false);
+  if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
+    throw C2DebugBundleError("Error while uploading");
+  }
+}
+
 void C2Agent::restart_agent() {
   std::string cwd = utils::Environment::getCurrentWorkingDirectory();
   if (cwd.empty()) {
@@ -654,6 +733,46 @@ utils::TaskRescheduleInfo C2Agent::consume() {
   return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
 }
 
+std::optional<std::string> C2Agent::resolveFlowUrl(const std::string& url) const {
+  if (utils::StringUtils::startsWith(url, "http")) {
+    return url;
+  }
+  std::string base;
+  if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
+    base = utils::StringUtils::trim(base);
+    if (!utils::StringUtils::endsWith(base, "/")) {
+      base += "/";
+    }
+    base += url;
+    return base;
+  } else if (configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
+    utils::URL base_url{utils::StringUtils::trim(base)};
+    if (base_url.isValid()) {
+      return base_url.hostPort() + "/c2/api/" + url;
+    }
+    logger_->log_error("Could not parse C2 REST URL '%s'", base);
+    return std::nullopt;
+  }
+  return url;
+}
+
+std::optional<std::string> C2Agent::resolveUrl(const std::string& url) const {
+  if (!utils::StringUtils::startsWith(url, "/")) {
+    return url;
+  }
+  std::string base;
+  if (!configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
+    logger_->log_error("Missing C2 REST URL");
+    return std::nullopt;
+  }
+  utils::URL base_url{utils::StringUtils::trim(base)};
+  if (base_url.isValid()) {
+    return base_url.hostPort() + url;
+  }
+  logger_->log_error("Could not parse C2 REST URL '%s'", base);
+  return std::nullopt;
+}
+
 std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
   if (!utils::StringUtils::startsWith(uri, "http") || protocol_.load() == nullptr) {
     // try to open the file
@@ -667,30 +786,13 @@ std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const {
     return {};
   }
 
-  std::string resolved_url = uri;
-  if (!utils::StringUtils::startsWith(uri, "http")) {
-    std::stringstream adjusted_url;
-    std::string base;
-    if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) {
-      base = utils::StringUtils::trim(base);
-      adjusted_url << base;
-      if (!utils::StringUtils::endsWith(base, "/")) {
-        adjusted_url << "/";
-      }
-      adjusted_url << uri;
-      resolved_url = adjusted_url.str();
-    } else if (configuration_->get("nifi.c2.rest.url", "c2.rest.url", base)) {
-      utils::URL base_url{utils::StringUtils::trim(base)};
-      if (!base_url.isValid()) {
-        logger_->log_error("Could not parse C2 REST URL '%s'", base);
-        return std::nullopt;
-      }
-      resolved_url = base_url.hostPort() + "/c2/api/" + uri;
-    }
+  std::optional<std::string> resolved_url = resolveFlowUrl(uri);
+  if (!resolved_url) {
+    return std::nullopt;
   }
 
   C2Payload payload(Operation::TRANSFER, true);
-  C2Payload &&response = protocol_.load()->consumePayload(resolved_url, payload, RECEIVE, false);
+  C2Payload &&response = protocol_.load()->consumePayload(resolved_url.value(), payload, RECEIVE, false);
 
   auto raw_data = response.getRawData();
   return std::string(raw_data.data(), raw_data.size());
diff --git a/libminifi/src/core/logging/internal/LogCompressorSink.cpp b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
index 112e6ae..9e6dcab 100644
--- a/libminifi/src/core/logging/internal/LogCompressorSink.cpp
+++ b/libminifi/src/core/logging/internal/LogCompressorSink.cpp
@@ -29,7 +29,8 @@ namespace internal {
 
 LogCompressorSink::LogCompressorSink(LogQueueSize cache_size, LogQueueSize compressed_size, std::shared_ptr<logging::Logger> logger)
   : cached_logs_(cache_size.max_total_size, cache_size.max_segment_size),
-    compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{std::move(logger)}) {
+    compressed_logs_(compressed_size.max_total_size, compressed_size.max_segment_size, ActiveCompressor::Allocator{logger}),
+    compressor_logger_(logger) {
   compression_thread_ = std::thread{&LogCompressorSink::run, this};
 }
 
@@ -39,8 +40,10 @@ LogCompressorSink::~LogCompressorSink() {
 }
 
 void LogCompressorSink::sink_it_(const spdlog::details::log_msg &msg) {
+  spdlog::memory_buf_t formatted;
+  base_sink<spdlog::details::null_mutex>::formatter_->format(msg, formatted);
   cached_logs_.modify([&] (LogBuffer& active) {
-    active.buffer_->write(reinterpret_cast<const uint8_t*>(msg.payload.data()), msg.payload.size());
+    active.buffer_->write(reinterpret_cast<const uint8_t*>(formatted.data()), formatted.size());
   });
 }
 
@@ -72,6 +75,11 @@ LogCompressorSink::CompressionResult LogCompressorSink::compress(bool force_rota
 
 void LogCompressorSink::flush_() {}
 
+std::unique_ptr<io::InputStream> LogCompressorSink::createEmptyArchive() {
+  auto compressor = ActiveCompressor::Allocator(compressor_logger_)(0);
+  return std::move(compressor.commit().buffer_);
+}
+
 }  // namespace internal
 }  // namespace logging
 }  // namespace core
diff --git a/libminifi/src/properties/Properties.cpp b/libminifi/src/properties/Properties.cpp
index 02b2378..8df8202 100644
--- a/libminifi/src/properties/Properties.cpp
+++ b/libminifi/src/properties/Properties.cpp
@@ -97,6 +97,11 @@ void Properties::loadConfigureFile(const char *fileName) {
   dirty_ = false;
 }
 
+std::string Properties::getFilePath() const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return properties_file_;
+}
+
 bool Properties::persistProperties() {
   std::lock_guard<std::mutex> lock(mutex_);
   if (!dirty_) {
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index c64e779..2626cd9 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -105,6 +105,9 @@ size_t ByteOutputCallback::readFully(char *buffer, size_t size) {
 }
 
 size_t ByteOutputCallback::read_current_str(char *buffer, size_t size) {
+  if (size == 0) {
+    return 0;
+  }
   size_t amount_to_read = size;
   size_t curr_buf_pos = 0;
   /**
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index e4cf792..e255cea 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -45,7 +45,7 @@ class IntegrationBase {
 
   virtual ~IntegrationBase() = default;
 
-  virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& bootstrap_file = {});
+  virtual void run(const std::optional<std::string>& test_file_location = {}, const std::optional<std::string>& home_path = {});
 
   void setKeyDir(const std::string& key_dir) {
     this->key_dir = key_dir;
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index de14f60..e0bf4e2 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -143,6 +143,10 @@ class TestUpdateSink : public minifi::state::StateMonitor {
     return 0;
   }
 
+  std::map<std::string, std::unique_ptr<minifi::io::InputStream>> getDebugInfo() override {
+    return {};
+  }
+
   /**
    * Clear connection for the agent.
    */
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index a2ff302..df22a4a 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -141,7 +141,7 @@ TEST_CASE("Test Compression", "[ttl7]") {
   std::shared_ptr<InputStream> compressed_log{logging::LoggerConfiguration::getCompressedLog(true)};
   REQUIRE(compressed_log);
   auto logs = decompress(compressed_log);
-  REQUIRE(logs == "Hi there");
+  REQUIRE(logs.find("Hi there") != std::string::npos);
 }
 
 class LoggerTestAccessor {