You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/06/07 12:34:27 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1845 Add c2 request compression option

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0f5c08b626f6b1282839e6d2810bb9ab35443f0c
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Tue Jun 7 13:14:33 2022 +0200

    MINIFICPP-1845 Add c2 request compression option
    
    Closes #1344
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 C2.md                                           |   3 +
 CONFIGURE.md                                    |   5 ++
 conf/minifi.properties                          |   5 ++
 extensions/http-curl/protocols/RESTSender.cpp   |  49 +++++++++---
 extensions/http-curl/protocols/RESTSender.h     |  19 +++--
 extensions/http-curl/tests/C2CompressTest.cpp   | 100 ++++++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt       |   1 +
 extensions/http-curl/tests/HTTPHandlers.h       |  11 ---
 extensions/http-curl/tests/ServerAwareHandler.h |  16 +++-
 libminifi/include/properties/Configuration.h    |   1 +
 libminifi/include/utils/ByteArrayCallback.h     |   5 ++
 libminifi/src/Configuration.cpp                 |   1 +
 12 files changed, 182 insertions(+), 34 deletions(-)

diff --git a/C2.md b/C2.md
index 8486cc5e3..4dd2a465a 100644
--- a/C2.md
+++ b/C2.md
@@ -94,6 +94,9 @@ be requested via C2 DESCRIBE manifest command.
 	# configure SSL Context service for REST Protocol
 	#nifi.c2.rest.ssl.context.service
 
+	# specify encoding strategy for c2 requests (gzip, none)
+	#nifi.c2.rest.request.encoding=none
+
 
 ### Metrics
 
diff --git a/CONFIGURE.md b/CONFIGURE.md
index 8d9346c7b..1d7f25f9e 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -288,6 +288,11 @@ It does require more configuration.  uid.minifi.device.segment.bits is used to s
 
 Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned to each MiNiFi instance.
 
+### Asset directory
+
+It is possible to make agents download an asset (triggered through the c2 protocol). The target directory can be specified
+using the `nifi.asset.directory` agent property, which defaults to `${MINIFI_HOME}/asset`.
+
 ### Controller Services
  If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1
  is linked to ControllerService2, and requires the latter to be started for ControllerService1 to start.
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 752b11e19..8ba1d3975 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -102,11 +102,16 @@ nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileM
 ## HeartbeatLogger logs the heartbeats on TRACE for debugging.
 #nifi.c2.agent.heartbeat.reporter.classes=HeartbeatLogger
 
+# specify encoding strategy for c2 requests (gzip, none)
+#nifi.c2.rest.request.encoding=none
+
 ## enable the controller socket provider on port 9998
 ## off by default. C2 must be enabled to support these
 #controller.socket.host=localhost
 #controller.socket.port=9998
 
+## specify the destination of c2 directed assets
+#nifi.asset.directory=${MINIFI_HOME}/asset
 
 #JNI properties
 nifi.framework.dir=${MINIFI_HOME}/minifi-jni/lib
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 7f593684d..6eee45716 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -27,12 +27,9 @@
 #include "utils/file/FileUtils.h"
 #include "core/Resource.h"
 #include "properties/Configuration.h"
+#include "io/ZlibStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 RESTSender::RESTSender(const std::string &name, const utils::Identifier &uuid)
     : C2Protocol(name, uuid) {
@@ -52,6 +49,18 @@ void RESTSender::initialize(core::controller::ControllerServiceProvider* control
         ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
       }
     }
+    if (auto req_encoding_str = configure->get(Configuration::nifi_c2_rest_request_encoding)) {
+      if (auto req_encoding = RequestEncoding::parse(req_encoding_str->c_str(), RequestEncoding{})) {
+        logger_->log_debug("Using request encoding '%s'", req_encoding.toString());
+        req_encoding_ = req_encoding;
+      } else {
+        logger_->log_error("Invalid request encoding '%s'", req_encoding_str.value());
+        req_encoding_ = RequestEncoding::None;
+      }
+    } else {
+      logger_->log_debug("Request encoding is not specified, using default '%s'", toString(RequestEncoding::None));
+      req_encoding_ = RequestEncoding::None;
+    }
   }
   logger_->log_debug("Submitting to %s", rest_uri_);
 }
@@ -124,10 +133,30 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
     } else {
       auto data_input = std::make_unique<utils::ByteInputCallback>();
       auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
-      data_input->write(data.value_or(""));
+      if (data && req_encoding_ == RequestEncoding::Gzip) {
+        io::BufferStream compressed_payload;
+        bool compression_success = [&] {
+          io::ZlibCompressStream compressor(gsl::make_not_null(&compressed_payload), io::ZlibCompressionFormat::GZIP, Z_BEST_COMPRESSION);
+          auto ret = compressor.write(gsl::span<const char>(data.value()).as_span<const std::byte>());
+          if (ret != data->length()) {
+            return false;
+          }
+          compressor.close();
+          return compressor.isFinished();
+        }();
+        if (compression_success) {
+          data_input->setBuffer(compressed_payload.moveBuffer());
+          client.appendHeader("Content-Encoding", "gzip");
+        } else {
+          logger_->log_error("Failed to compress request body, falling back to no compression");
+          data_input->write(data.value());
+        }
+      } else {
+        data_input->write(data.value_or(""));
+      }
       data_cb->ptr = data_input.get();
       client.setUploadCallback(data_cb.get());
-      client.setPostSize(data ? data->size() : 0);
+      client.setPostSize(data_input->getBufferSize());
       inputs.push_back(std::move(data_input));
       callbacks.push_back(std::move(data_cb));
     }
@@ -174,8 +203,4 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
 
 REGISTER_RESOURCE(RESTSender, "Encapsulates the restful protocol that is built upon C2Protocol.");
 
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index c7a8c86c0..31e038315 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -24,12 +24,9 @@
 #include "c2/protocols/RESTProtocol.h"
 #include "controllers/SSLContextService.h"
 #include "../client/HTTPClient.h"
+#include "utils/Enum.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 /**
  * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol.
@@ -40,6 +37,11 @@ namespace c2 {
  *
  */
 class RESTSender : public RESTProtocol, public C2Protocol {
+  SMART_ENUM(RequestEncoding,
+    (None, "none"),
+    (Gzip, "gzip")
+  )
+
  public:
   explicit RESTSender(const std::string &name, const utils::Identifier &uuid = utils::Identifier());
 
@@ -66,13 +68,10 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   std::string rest_uri_;
   std::string ack_uri_;
+  RequestEncoding req_encoding_;
 
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RESTSender>::getLogger();
 };
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/extensions/http-curl/tests/C2CompressTest.cpp b/extensions/http-curl/tests/C2CompressTest.cpp
new file mode 100644
index 000000000..673cdd55e
--- /dev/null
+++ b/extensions/http-curl/tests/C2CompressTest.cpp
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "TestBase.h"
+#include "Catch.h"
+
+#include "c2/C2Agent.h"
+#include "c2/HeartbeatLogger.h"
+#include "protocols/RESTProtocol.h"
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "range/v3/action/sort.hpp"
+#include "range/v3/action/unique.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/filter.hpp"
+#include "range/v3/view/split.hpp"
+#include "range/v3/view/transform.hpp"
+#include "utils/IntegrationTestUtils.h"
+#include "utils/StringUtils.h"
+#include "properties/Configuration.h"
+#include "io/ZlibStream.h"
+
+class CompressedHeartbeatHandler : public HeartbeatHandler {
+ protected:
+  std::string readPayload(struct mg_connection* conn) override {
+    auto payload = HeartbeatHandler::readPayload(conn);
+    const char* encoding = mg_get_header(conn, "content-encoding");
+    if (!encoding || std::string_view(encoding).find("gzip") == std::string_view::npos) {
+      return payload;
+    }
+    received_compressed_ = true;
+    minifi::io::BufferStream output;
+    {
+      minifi::io::ZlibDecompressStream decompressor(gsl::make_not_null(&output));
+      auto ret = decompressor.write(gsl::span<const char>(payload).as_span<const std::byte>());
+      assert(ret == payload.size());
+    }
+    auto str_span = output.getBuffer().as_span<const char>();
+    return {str_span.data(), str_span.size()};
+  }
+
+ public:
+  using HeartbeatHandler::HeartbeatHandler;
+
+  std::atomic_bool received_compressed_{false};
+};
+
+class VerifyCompressedHeartbeat : public VerifyC2Base {
+ public:
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
+  }
+
+  void configureC2() override {
+    VerifyC2Base::configureC2();
+    configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_agent_heartbeat_period, "100");
+    configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_rest_request_encoding, "gzip");
+  }
+
+  void setVerifier(std::function<bool()> verify) {
+    verify_ = std::move(verify);
+  }
+
+ private:
+  std::function<bool()> verify_;
+};
+
+int main() {
+  VerifyCompressedHeartbeat harness;
+  CompressedHeartbeatHandler responder(harness.getConfiguration());
+  harness.setVerifier([&] () -> bool {
+    return responder.received_compressed_;
+  });
+  harness.setUrl("https://localhost:0/heartbeat", &responder);
+  harness.run();
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index d8849c8db..cd4ce560d 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -103,3 +103,4 @@ add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
 add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateAssetTest COMMAND C2UpdateAssetTest)
+add_test(NAME C2CompressTest COMMAND C2CompressTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index eeeea2dde..07662569b 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -375,17 +375,6 @@ class DeleteTransactionResponder : public ServerAwareHandler {
   std::string response_code;
 };
 
-std::string readPayload(struct mg_connection *conn) {
-  std::string response;
-  int readBytes;
-
-  std::array<char, 1024> buffer;
-  while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
-    response.append(buffer.data(), readBytes);
-  }
-  return response;
-}
-
 class HeartbeatHandler : public ServerAwareHandler {
  public:
   explicit HeartbeatHandler(std::shared_ptr<minifi::Configure> configuration) : configuration_(std::move(configuration)) {}
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h b/extensions/http-curl/tests/ServerAwareHandler.h
index 473109e71..14b263bfc 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -18,7 +18,10 @@
 
 #pragma once
 
-class ServerAwareHandler: public CivetHandler{
+#include <string>
+#include <array>
+
+class ServerAwareHandler: public CivetHandler {
  protected:
   void sleep_for(std::chrono::milliseconds time) {
     std::unique_lock<std::mutex> lock(mutex_);
@@ -29,6 +32,17 @@ class ServerAwareHandler: public CivetHandler{
     return !terminate_.load();
   }
 
+  virtual std::string readPayload(struct mg_connection* conn) {
+    std::string response;
+    int readBytes;
+
+    std::array<char, 1024> buffer;
+    while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
+      response.append(buffer.data(), readBytes);
+    }
+    return response;
+  }
+
  public:
   void stop() {
     terminate_ = true;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index aa0c05f3f..8f6636c7c 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -114,6 +114,7 @@ class Configuration : public Properties {
   static constexpr const char *nifi_c2_rest_url_ack = "nifi.c2.rest.url.ack";
   static constexpr const char *nifi_c2_rest_ssl_context_service = "nifi.c2.rest.ssl.context.service";
   static constexpr const char *nifi_c2_rest_heartbeat_minimize_updates = "nifi.c2.rest.heartbeat.minimize.updates";
+  static constexpr const char *nifi_c2_rest_request_encoding = "nifi.c2.rest.request.encoding";
   static constexpr const char *nifi_c2_mqtt_connector_service = "nifi.c2.mqtt.connector.service";
   static constexpr const char *nifi_c2_mqtt_heartbeat_topic = "nifi.c2.mqtt.heartbeat.topic";
   static constexpr const char *nifi_c2_mqtt_update_topic = "nifi.c2.mqtt.update.topic";
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index bbf6dbb8c..503589300 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -20,6 +20,7 @@
 #include <memory>
 #include <string>
 #include <vector>
+#include <utility>
 
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
@@ -52,6 +53,10 @@ class ByteInputCallback {
     vec = utils::span_to<std::vector>(gsl::make_span(content).as_span<std::byte>());
   }
 
+  void setBuffer(std::vector<std::byte> data) {
+    vec = std::move(data);
+  }
+
   virtual std::byte* getBuffer(size_t pos) {
     gsl_Expects(pos <= vec.size());
     return vec.data() + pos;
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 07970d51f..fdfc49cf5 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -92,6 +92,7 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_c2_rest_url},
   core::ConfigurationProperty{Configuration::nifi_c2_rest_url_ack},
   core::ConfigurationProperty{Configuration::nifi_c2_rest_ssl_context_service},
+  core::ConfigurationProperty{Configuration::nifi_c2_rest_request_encoding},
   core::ConfigurationProperty{Configuration::nifi_c2_rest_heartbeat_minimize_updates, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_c2_mqtt_connector_service},
   core::ConfigurationProperty{Configuration::nifi_c2_mqtt_heartbeat_topic},