You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:01 UTC

[nifi-minifi-cpp] 02/04: MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9cc12971c94d6d0958ecdd2da8a63e004aa01e8c
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:15:45 2022 +0200

    MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP
    
    Closes #1321
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |   1 +
 extensions/http-curl/client/HTTPClient.cpp         |  33 ++++
 extensions/http-curl/client/HTTPClient.h           |   4 +
 extensions/http-curl/processors/InvokeHTTP.cpp     | 211 +++++++++++----------
 extensions/http-curl/processors/InvokeHTTP.h       |  76 ++------
 .../http-curl/tests/unit/HTTPClientTests.cpp       |  18 ++
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |  92 ++++++++-
 7 files changed, 269 insertions(+), 166 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d97f8752d..88a074211 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -924,6 +924,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Follow Redirects|true||Follow HTTP redirects issued by remote server.|
 |HTTP Method|GET||HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.|
 |Include Date Header|true||Include an RFC-2616 Date header in the request.|
+|**Invalid HTTP Header Field Handling Strategy**|transform|transform<br/>fail<br/>drop|Indicates what should happen when an attribute's name is not a valid HTTP header field name.<br/>Options:<br/>transform - invalid characters are replaced<br/>fail - flow file is transferred to failure<br/>drop - drops invalid attributes from HTTP message|
 |invokehttp-proxy-password|||Password to set when authenticating against proxy|
 |invokehttp-proxy-username|||Username to set when authenticating against proxy|
 |Penalize on "No Retry"|false||Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship.|
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 16298aa24..eeec4194f 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -455,6 +455,39 @@ void HTTPClient::setFollowRedirects(bool follow) {
   curl_easy_setopt(http_session_, CURLOPT_FOLLOWLOCATION, follow);
 }
 
+bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
+  if (field_name.size() == 0) {
+    return false;
+  }
+
+  // RFC822 3.1.2: The  field-name must be composed of printable ASCII characters
+  // (i.e., characters that  have  values  between  33.  and  126., decimal, except colon).
+  for (auto ch : field_name) {
+    if (ch < 33 || ch > 126 || ch == ':') {
+      return false;
+    }
+  }
+  return true;
+}
+
+std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name) {
+  if (field_name.size() == 0) {
+    return "X-MiNiFi-Empty-Attribute-Name";
+  }
+
+  std::string result;
+  // RFC822 3.1.2: The  field-name must be composed of printable ASCII characters
+  // (i.e., characters that  have  values  between  33.  and  126., decimal, except colon).
+  for (auto ch : field_name) {
+    if (ch < 33 || ch > 126 || ch == ':') {
+      result += '-';
+    } else {
+      result += ch;
+    }
+  }
+  return result;
+}
+
 REGISTER_INTERNAL_RESOURCE(HTTPClient);
 
 }  // namespace utils
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 05a24e144..7159895c2 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -48,6 +48,7 @@
 #else
 #include <regex.h>
 #endif
+#include <string_view>
 
 #include "utils/ByteArrayCallback.h"
 #include "controllers/SSLContextService.h"
@@ -238,6 +239,9 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
     }
   }
 
+  static bool isValidHttpHeaderField(std::string_view field_name);
+  static std::string replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name);
+
  private:
   static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
 
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 3a4516883..aea10f730 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -17,11 +17,7 @@
  */
 
 #include "InvokeHTTP.h"
-#ifdef WIN32
-#include <regex>
-#else
-#include <regex.h>
-#endif
+
 #include <memory>
 #include <cinttypes>
 #include <cstdint>
@@ -39,14 +35,10 @@
 #include "ResourceClaim.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
-const char *InvokeHTTP::ProcessorName = "InvokeHTTP";
 std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
 
 core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
@@ -115,6 +107,16 @@ core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will
 core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
 
 core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false");
+
+core::Property InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy(
+    core::PropertyBuilder::createProperty("Invalid HTTP Header Field Handling Strategy")
+      ->withDescription("Indicates what should happen when an attribute's name is not a valid HTTP header field name. "
+        "Options: transform - invalid characters are replaced, fail - flow file is transferred to failure, drop - drops invalid attributes from HTTP message")
+      ->isRequired(true)
+      ->withDefaultValue<std::string>(toString(InvalidHTTPHeaderFieldHandlingOption::TRANSFORM))
+      ->withAllowableValues<std::string>(InvalidHTTPHeaderFieldHandlingOption::values())
+      ->build());
+
 const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
 const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
 const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body";
@@ -142,32 +144,29 @@ core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will
 
 void InvokeHTTP::initialize() {
   logger_->log_trace("Initializing InvokeHTTP");
-
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(Method);
-  properties.insert(URL);
-  properties.insert(ConnectTimeout);
-  properties.insert(ReadTimeout);
-  properties.insert(DateHeader);
-  properties.insert(AttributesToSend);
-  properties.insert(SSLContext);
-  properties.insert(ProxyHost);
-  properties.insert(ProxyPort);
-  properties.insert(ProxyUsername);
-  properties.insert(ProxyPassword);
-  properties.insert(UseChunkedEncoding);
-  properties.insert(ContentType);
-  properties.insert(SendBody);
-  properties.insert(SendMessageBody);
-  properties.insert(DisablePeerVerification);
-  properties.insert(AlwaysOutputResponse);
-  properties.insert(FollowRedirects);
-  properties.insert(PropPutOutputAttributes);
-  properties.insert(PenalizeOnNoRetry);
-
-  setSupportedProperties(properties);
-  // Set the supported relationships
+  setSupportedProperties({
+    Method,
+    URL,
+    ConnectTimeout,
+    ReadTimeout,
+    DateHeader,
+    AttributesToSend,
+    SSLContext,
+    ProxyHost,
+    ProxyPort,
+    ProxyUsername,
+    ProxyPassword,
+    UseChunkedEncoding,
+    ContentType,
+    SendBody,
+    SendMessageBody,
+    DisablePeerVerification,
+    AlwaysOutputResponse,
+    FollowRedirects,
+    PropPutOutputAttributes,
+    PenalizeOnNoRetry,
+    InvalidHTTPHeaderFieldHandlingStrategy
+  });
   setSupportedRelationships({Success, RelResponse, RelFailure, RelRetry, RelNoRetry});
 }
 
@@ -182,7 +181,6 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-
   if (auto connect_timeout = context->getProperty<core::TimePeriodValue>(ConnectTimeout)) {
     connect_timeout_ms_ =  connect_timeout->getMilliseconds();
   } else {
@@ -190,9 +188,9 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-  std::string contentTypeStr;
-  if (context->getProperty(ContentType.getName(), contentTypeStr)) {
-    content_type_ = contentTypeStr;
+  std::string content_type_str;
+  if (context->getProperty(ContentType.getName(), content_type_str)) {
+    content_type_ = content_type_str;
   }
 
   if (auto read_timeout = context->getProperty<core::TimePeriodValue>(ReadTimeout)) {
@@ -201,12 +199,12 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName(), ReadTimeout.getValue());
   }
 
-  std::string dateHeaderStr;
-  if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
+  std::string date_header_str;
+  if (!context->getProperty(DateHeader.getName(), date_header_str)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", DateHeader.getName(), DateHeader.getValue());
   }
 
-  date_header_include_ = utils::StringUtils::toBool(dateHeaderStr).value_or(DateHeader.getValue());
+  date_header_include_ = utils::StringUtils::toBool(date_header_str).value_or(DateHeader.getValue());
 
   if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName(), PropPutOutputAttributes.getValue());
@@ -238,15 +236,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     }
   }
 
-  std::string useChunkedEncoding = "false";
-  if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) {
+  std::string use_chunked_encoding = "false";
+  if (!context->getProperty(UseChunkedEncoding.getName(), use_chunked_encoding)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName(), UseChunkedEncoding.getValue());
   }
 
-  use_chunked_encoding_ = utils::StringUtils::toBool(useChunkedEncoding).value_or(false);
+  use_chunked_encoding_ = utils::StringUtils::toBool(use_chunked_encoding).value_or(false);
 
-  std::string disablePeerVerification;
-  disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification) && utils::StringUtils::toBool(disablePeerVerification).value_or(false));
+  std::string disable_peer_verification;
+  disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disable_peer_verification) && utils::StringUtils::toBool(disable_peer_verification).value_or(false));
 
   proxy_ = {};
   context->getProperty(ProxyHost.getName(), proxy_.host);
@@ -258,27 +256,35 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
   context->getProperty(ProxyPassword.getName(), proxy_.password);
   context->getProperty(FollowRedirects.getName(), follow_redirects_);
   context->getProperty(SendMessageBody.getName(), send_body_);
-}
 
-InvokeHTTP::~InvokeHTTP() = default;
+  invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<InvalidHTTPHeaderFieldHandlingOption>(*context, InvalidHTTPHeaderFieldHandlingStrategy);
+}
 
-std::string InvokeHTTP::generateId() {
-  return utils::IdGenerator::getIdGenerator()->generate().to_string();
+bool InvokeHTTP::shouldEmitFlowFile() const {
+  return ("POST" == method_ || "PUT" == method_ || "PATCH" == method_);
 }
 
-bool InvokeHTTP::emitFlowFile(const std::string &method) {
-  return ("POST" == method || "PUT" == method || "PATCH" == method);
+std::optional<std::map<std::string, std::string>> InvokeHTTP::validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const {
+  std::map<std::string, std::string> result;
+  for (const auto& [attribute_name, attribute_value] : attributes) {
+    if (utils::HTTPClient::isValidHttpHeaderField(attribute_name)) {
+      result.emplace(attribute_name, attribute_value);
+    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::TRANSFORM) {
+      result.emplace(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(attribute_name), attribute_value);
+    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::FAIL) {
+      return std::nullopt;
+    }
+  }
+  return result;
 }
 
 void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  auto flowFile = session->get();
+  auto flow_file = session->get();
 
-  std::string url = url_;
-
-  if (flowFile == nullptr) {
-    if (!emitFlowFile(method_)) {
+  if (flow_file == nullptr) {
+    if (!shouldEmitFlowFile()) {
       logger_->log_debug("InvokeHTTP -- create flow file with  %s", method_);
-      flowFile = session->create();
+      flow_file = session->create();
     } else {
       logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", method_);
       yield();
@@ -291,11 +297,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   logger_->log_debug("onTrigger InvokeHTTP with %s to %s", method_, url_);
 
   // create a transaction id
-  std::string tx_id = generateId();
+  std::string tx_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
 
-  // Note: callback must be declared before callbackObj so that they are destructed in the correct order
-  std::unique_ptr<utils::ByteInputCallback> callback = nullptr;
-  std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
+  // Note: callback must be declared before callback_obj so that they are destructed in the correct order
+  std::unique_ptr<utils::ByteInputCallback> callback;
+  std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
 
   // Client declared after the callbacks to make sure the callbacks are still available when the client is destructed
   utils::HTTPClient client(url_, ssl_context_service_);
@@ -320,25 +326,25 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
 
   client.setHTTPProxy(proxy_);
 
-  if (emitFlowFile(method_)) {
+  if (shouldEmitFlowFile()) {
     logger_->log_trace("InvokeHTTP -- reading flowfile");
-    std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
+    std::shared_ptr<ResourceClaim> claim = flow_file->getResourceClaim();
     if (claim) {
       callback = std::make_unique<utils::ByteInputCallback>();
       if (send_body_) {
-        session->read(flowFile, std::ref(*callback));
+        session->read(flow_file, std::ref(*callback));
       }
-      callbackObj = std::make_unique<utils::HTTPUploadCallback>();
-      callbackObj->ptr = callback.get();
-      callbackObj->pos = 0;
+      callback_obj = std::make_unique<utils::HTTPUploadCallback>();
+      callback_obj->ptr = callback.get();
+      callback_obj->pos = 0;
       logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
       if (!send_body_) {
         client.appendHeader("Content-Length", "0");
       } else if (!use_chunked_encoding_) {
-        client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
+        client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
       }
-      client.setUploadCallback(callbackObj.get());
-      client.setSeekFunction(callbackObj.get());
+      client.setUploadCallback(callback_obj.get());
+      client.setSeekFunction(callback_obj.get());
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
@@ -348,14 +354,19 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   }
 
   // append all headers
-  client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes());
+  auto attributes_in_headers = validateAttributesAgainstHTTPHeaderRules(flow_file->getAttributes());
+  if (!attributes_in_headers) {
+    session->transfer(flow_file, RelFailure);
+    return;
+  }
+  client.build_header_list(attribute_to_send_regex_, *attributes_in_headers);
 
   logger_->log_trace("InvokeHTTP -- curl performed");
   if (client.submit()) {
     logger_->log_trace("InvokeHTTP -- curl successful");
 
-    bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
-    if (putToAttribute) {
+    bool put_to_attribute = !IsNullOrEmpty(put_attribute_name_);
+    if (put_to_attribute) {
       logger_->log_debug("Adding http response body to flow file attribute %s", put_attribute_name_);
     }
 
@@ -364,21 +375,21 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
 
     int64_t http_code = client.getResponseCode();
     const char *content_type = client.getContentType();
-    flowFile->addAttribute(STATUS_CODE, std::to_string(http_code));
+    flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
     if (!response_headers.empty())
-      flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0));
-    flowFile->addAttribute(REQUEST_URL, url_);
-    flowFile->addAttribute(TRANSACTION_ID, tx_id);
+      flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0));
+    flow_file->addAttribute(REQUEST_URL, url_);
+    flow_file->addAttribute(TRANSACTION_ID, tx_id);
 
-    bool isSuccess = (static_cast<int32_t>(http_code / 100) == 2);
-    bool output_body_to_content = isSuccess && !putToAttribute;
+    bool is_success = (static_cast<int32_t>(http_code / 100) == 2);
+    bool output_body_to_content = is_success && !put_to_attribute;
 
-    logger_->log_debug("isSuccess: %d, response code %" PRId64, isSuccess, http_code);
+    logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
     std::shared_ptr<core::FlowFile> response_flow = nullptr;
 
     if (output_body_to_content) {
-      if (flowFile != nullptr) {
-        response_flow = session->create(flowFile);
+      if (flow_file != nullptr) {
+        response_flow = session->create(flow_file);
       } else {
         response_flow = session->create();
       }
@@ -389,47 +400,47 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
       response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
       if (!response_headers.empty())
         response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0));
-      response_flow->addAttribute(REQUEST_URL, url);
+      response_flow->addAttribute(REQUEST_URL, url_);
       response_flow->addAttribute(TRANSACTION_ID, tx_id);
       io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
       // need an import from the data stream.
       session->importFrom(stream, response_flow);
     }
-    route(flowFile, response_flow, session, context, isSuccess, http_code);
+    route(flow_file, response_flow, session, context, is_success, http_code);
   } else {
-    session->penalize(flowFile);
-    session->transfer(flowFile, RelFailure);
+    session->penalize(flow_file);
+    session->transfer(flow_file, RelFailure);
   }
 }
 
 void InvokeHTTP::route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
-                       const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode) {
+                       const std::shared_ptr<core::ProcessContext> &context, bool is_success, int64_t status_code) {
   // check if we should yield the processor
-  if (!isSuccess && request == nullptr) {
+  if (!is_success && request == nullptr) {
     context->yield();
   }
 
   // If the property to output the response flowfile regardless of status code is set then transfer it
-  bool responseSent = false;
+  bool response_sent = false;
   if (always_output_response_ && response != nullptr) {
     logger_->log_debug("Outputting success and response");
     session->transfer(response, RelResponse);
-    responseSent = true;
+    response_sent = true;
   }
 
   // transfer to the correct relationship
   // 2xx -> SUCCESS
-  if (isSuccess) {
+  if (is_success) {
     // we have two flowfiles to transfer
     if (request != nullptr) {
       session->transfer(request, Success);
     }
-    if (response != nullptr && !responseSent) {
+    if (response != nullptr && !response_sent) {
       logger_->log_debug("Outputting success and response");
       session->transfer(response, RelResponse);
     }
     // 5xx -> RETRY
-  } else if (statusCode / 100 == 5) {
+  } else if (status_code / 100 == 5) {
     if (request != nullptr) {
       session->penalize(request);
       session->transfer(request, RelRetry);
@@ -449,8 +460,4 @@ REGISTER_RESOURCE(InvokeHTTP, "An HTTP client processor which can interact with
     "The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the "
     "FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 08c89c604..bff7895b9 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -21,6 +21,7 @@
 #include <curl/curl.h>
 #include <memory>
 #include <string>
+#include <map>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -31,30 +32,24 @@
 #include "utils/Id.h"
 #include "../client/HTTPClient.h"
 #include "utils/Export.h"
+#include "utils/Enum.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
-// InvokeHTTP Class
 class InvokeHTTP : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
+  SMART_ENUM(InvalidHTTPHeaderFieldHandlingOption,
+    (FAIL, "fail"),
+    (TRANSFORM, "transform"),
+    (DROP, "drop")
+  )
+
   explicit InvokeHTTP(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid) {
     setTriggerWhenEmpty(true);
   }
-  // Destructor
-  virtual ~InvokeHTTP();
-  // Processor Name
-  EXTENSIONAPI static const char *ProcessorName;
   EXTENSIONAPI static std::string DefaultContentType;
-  // Supported Properties
+
   EXTENSIONAPI static core::Property Method;
   EXTENSIONAPI static core::Property URL;
   EXTENSIONAPI static core::Property ConnectTimeout;
@@ -73,10 +68,9 @@ class InvokeHTTP : public core::Processor {
   EXTENSIONAPI static core::Property UseChunkedEncoding;
   EXTENSIONAPI static core::Property DisablePeerVerification;
   EXTENSIONAPI static core::Property PropPutOutputAttributes;
-
   EXTENSIONAPI static core::Property AlwaysOutputResponse;
-
   EXTENSIONAPI static core::Property PenalizeOnNoRetry;
+  EXTENSIONAPI static core::Property InvalidHTTPHeaderFieldHandlingStrategy;
 
   EXTENSIONAPI static const char* STATUS_CODE;
   EXTENSIONAPI static const char* STATUS_MESSAGE;
@@ -86,7 +80,7 @@ class InvokeHTTP : public core::Processor {
   EXTENSIONAPI static const char* REMOTE_DN;
   EXTENSIONAPI static const char* EXCEPTION_CLASS;
   EXTENSIONAPI static const char* EXCEPTION_MESSAGE;
-  // Supported Relationships
+
   EXTENSIONAPI static core::Relationship Success;
   EXTENSIONAPI static core::Relationship RelResponse;
   EXTENSIONAPI static core::Relationship RelRetry;
@@ -96,20 +90,8 @@ class InvokeHTTP : public core::Processor {
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  /**
-   * Provides a reference to the URL.
-   */
-  const std::string &getUrl() {
-    return url_;
-  }
-
- protected:
-  /**
-   * Generate a transaction ID
-   * @return transaction ID string.
-   */
-  std::string generateId();
 
+ private:
   /**
    * Routes the flowfile to the proper destination
    * @param request request flow file record
@@ -121,49 +103,29 @@ class InvokeHTTP : public core::Processor {
    */
   void route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
              const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode);
-  /**
-   * Determine if we should emit a new flowfile based on our activity
-   * @param method method type
-   * @return result of the evaluation.
-   */
-  bool emitFlowFile(const std::string &method);
-
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_{nullptr};
+  bool shouldEmitFlowFile() const;
+  std::optional<std::map<std::string, std::string>> validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const;
 
-  // http method
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
   std::string method_;
-  // url
   std::string url_;
-  // include date in the header
   bool date_header_include_{true};
-  // attribute to send regex
   std::string attribute_to_send_regex_;
-  // connection timeout
   std::chrono::milliseconds connect_timeout_ms_{20000};
-  // read timeout.
   std::chrono::milliseconds read_timeout_ms_{20000};
   // attribute in which response body will be added
   std::string put_attribute_name_;
-  // determine if we always output a response.
   bool always_output_response_{false};
-  // content type.
   std::string content_type_;
-  // use chunked encoding.
   bool use_chunked_encoding_{false};
-  // penalize on no retry
   bool penalize_no_retry_{false};
-  // disable peer verification ( makes susceptible for MITM attacks )
+  // disabling peer verification makes susceptible for MITM attacks
   bool disable_peer_verification_{false};
   utils::HTTPProxy proxy_;
   bool follow_redirects_{true};
   bool send_body_{true};
-
- private:
+  InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger()};
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/tests/unit/HTTPClientTests.cpp b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
index ac6d78ffe..1101b4ef4 100644
--- a/extensions/http-curl/tests/unit/HTTPClientTests.cpp
+++ b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
@@ -95,3 +95,21 @@ TEST_CASE("HTTPClient escape test") {
   CHECK(client.escape("Hello Günter") == "Hello%20G%C3%BCnter");
   CHECK(client.escape("шеллы") == "%D1%88%D0%B5%D0%BB%D0%BB%D1%8B");
 }
+
+TEST_CASE("HTTPClient isValidHttpHeaderField test") {
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(""));
+  CHECK(utils::HTTPClient::isValidHttpHeaderField("valid"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(" "));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(11) + "character"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(128) + "character"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField("contains:invalid"));
+}
+
+TEST_CASE("HTTPClient replaceInvalidCharactersInHttpHeaderFieldName test") {
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("") == "X-MiNiFi-Empty-Attribute-Name");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("valid") == "valid");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(" ") == "-");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(11) + "character") == "invalid-character");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(128) + "character") == "invalid-character");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("contains:invalid") == "contains-invalid");
+}
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index b8213c53b..032e5cee7 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -34,27 +34,41 @@
 #include "core/ProcessorNode.h"
 #include "processors/LogAttribute.h"
 #include "utils/gsl.h"
-#include "processors/GenerateFlowFile.h"
+#include "SingleProcessorTestController.h"
+
+namespace org::apache::nifi::minifi::test {
 
-namespace {
 class TestHTTPServer {
  public:
   TestHTTPServer();
   static constexpr const char* PROCESSOR_NAME = "my_http_server";
   static constexpr const char* URL = "http://localhost:8681/testytesttest";
 
+  void trigger() {
+    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+    test_plan_->reset();
+    test_controller_.runSession(test_plan_);
+  }
+
  private:
   TestController test_controller_;
+  std::shared_ptr<core::Processor> listen_http_;
+  std::shared_ptr<core::Processor> log_attribute_;
   std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
 };
 
 TestHTTPServer::TestHTTPServer() {
-  std::shared_ptr<core::Processor> listen_http = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
-  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest");
-  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+
+  listen_http_ = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
+  log_attribute_ = test_plan_->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true);
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "testytesttest");
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex.getName(), ".*");
   test_controller_.runSession(test_plan_);
 }
-}  // namespace
 
 TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   TestController testController;
@@ -251,7 +265,7 @@ TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
   std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
 
   plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
-  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), "http://localhost:8681/invalid");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
 
   constexpr const char* PENALIZE_LOG_PATTERN = "Penalizing [0-9a-f-]+ for [0-9]+ms at invokehttp";
@@ -289,3 +303,67 @@ TEST_CASE("HTTPTestsPutResponseBodyinAttribute", "[httptest1]") {
 
   REQUIRE(LogTestController::getInstance().contains("Adding http response body to flow file attribute http.type"));
 }
+
+TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in HTTP headers", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  LogTestController::getInstance().setDebug<InvokeHTTP>();
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::RelFailure);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+}
+
+TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE(LogTestController::getInstance().contains("key:invalid-header value:value"));
+  REQUIRE(LogTestController::getInstance().contains("key:X-MiNiFi-Empty-Attribute-Name value:value2"));
+}
+
+TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE(LogTestController::getInstance().contains("key:legit-header value:value1"));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
+}
+
+}  // namespace org::apache::nifi::minifi::test