You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/05/18 13:37:16 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1828 fix InvokeHTTP Attributes to Send

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1966684b3 MINIFICPP-1828 fix InvokeHTTP Attributes to Send
1966684b3 is described below

commit 1966684b35a26896dcd22a4f16b1167191ca8364
Author: Marton Szasz <sz...@apache.org>
AuthorDate: Fri May 13 03:20:40 2022 +0200

    MINIFICPP-1828 fix InvokeHTTP Attributes to Send
    
    It was using subsequence matching (search) instead of full matching
    (match).
    
    Additionally, renamed "pattern" to "string" in RegexUtils, because it
    was used to refer to the string in which we looked for the pattern
    (regex). It was confusing.
    
    unrelated change: added processor name to backpressure logging to help
    debugging
    
    add utils::filter to filter optional values
    
    invokehttp: fail only on invalid matching attributes, not any, if policy is FAIL
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1331
---
 extensions/http-curl/client/HTTPClient.cpp         | 34 ++--------
 extensions/http-curl/client/HTTPClient.h           | 14 +----
 extensions/http-curl/processors/InvokeHTTP.cpp     | 60 ++++++++++++------
 extensions/http-curl/processors/InvokeHTTP.h       |  5 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       | 73 ++++++++++++++++++++++
 libminifi/include/utils/OptionalUtils.h            | 12 ++++
 libminifi/include/utils/RegexUtils.h               | 34 +++++-----
 .../utils/detail/MonadicOperationWrappers.h        |  7 +++
 libminifi/src/SchedulingAgent.cpp                  |  2 +-
 libminifi/src/utils/RegexUtils.cpp                 | 48 +++++++-------
 libminifi/test/unit/OptionalTest.cpp               |  5 ++
 11 files changed, 191 insertions(+), 103 deletions(-)

diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index eeec4194f..fe4712318 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -17,12 +17,12 @@
  */
 #include "HTTPClient.h"
 
-#include <memory>
+#include <algorithm>
 #include <cinttypes>
 #include <map>
-#include <vector>
+#include <memory>
 #include <string>
-#include <algorithm>
+#include <vector>
 
 #include "Exception.h"
 #include "utils/gsl.h"
@@ -30,11 +30,7 @@
 #include "core/Resource.h"
 #include "utils/RegexUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
     : core::Connectable("HTTPClient"),
@@ -212,18 +208,6 @@ void HTTPClient::setSeekFunction(HTTPUploadCallback *callbackObj) {
   curl_easy_setopt(http_session_, CURLOPT_SEEKFUNCTION, &utils::HTTPRequestResponse::seek_callback);
 }
 
-struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
-  if (http_session_) {
-    for (auto attribute : attributes) {
-      if (matches(attribute.first, regex)) {
-        std::string attr = attribute.first + ":" + attribute.second;
-        headers_ = curl_slist_append(headers_, attr.c_str());
-      }
-    }
-  }
-  return headers_;
-}
-
 void HTTPClient::setContentType(std::string content_type) {
   content_type_ = "Content-Type: " + content_type;
   headers_ = curl_slist_append(headers_, content_type_.c_str());
@@ -254,9 +238,7 @@ void HTTPClient::appendHeader(const std::string &new_header) {
 }
 
 void HTTPClient::appendHeader(const std::string &key, const std::string &value) {
-  std::stringstream new_header;
-  new_header << key << ": " << value;
-  headers_ = curl_slist_append(headers_, new_header.str().c_str());
+  headers_ = curl_slist_append(headers_, utils::StringUtils::join_pack(key, ": ", value).c_str());
 }
 
 void HTTPClient::setUseChunkedEncoding() {
@@ -490,8 +472,4 @@ std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::strin
 
 REGISTER_INTERNAL_RESOURCE(HTTPClient);
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 7159895c2..16491be46 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -55,11 +55,7 @@
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 /**
  * Purpose and Justification: Pull the basics for an HTTPClient into a self contained class. Simply provide
@@ -113,8 +109,6 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   virtual void setReadCallback(HTTPReadCallback *callbackObj);
 
-  struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
-
   void setContentType(std::string content_type) override;
 
   std::string escape(std::string string_to_escape) override;
@@ -307,11 +301,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<HTTPClient>::getLogger()};
 };
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
 
 #ifdef WIN32
 #pragma warning(pop)
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index a0a7f9048..2177d2c89 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -18,11 +18,12 @@
 
 #include "InvokeHTTP.h"
 
-#include <memory>
 #include <cinttypes>
 #include <cstdint>
+#include <memory>
 #include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "utils/ByteArrayCallback.h"
@@ -36,6 +37,9 @@
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/filter.hpp"
+#include "range/v3/algorithm/any_of.hpp"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -210,9 +214,10 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName(), PropPutOutputAttributes.getValue());
   }
 
-  if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) {
-    logger_->log_debug("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName(), AttributesToSend.getValue());
-  }
+  attributes_to_send_ = context->getProperty(AttributesToSend)
+      | utils::filter([](const std::string& s) { return !s.empty(); })  // avoid compiling an empty string to regex
+      | utils::map([](const std::string& regex_str) { return utils::Regex{regex_str}; })
+      | utils::orElse([this] { logger_->log_debug("%s is missing, so the default value will be used", AttributesToSend.getName()); });
 
   std::string always_output_response;
   if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) {
@@ -269,18 +274,37 @@ bool InvokeHTTP::shouldEmitFlowFile() const {
   return ("POST" == method_ || "PUT" == method_ || "PATCH" == method_);
 }
 
-std::optional<std::map<std::string, std::string>> InvokeHTTP::validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const {
-  std::map<std::string, std::string> result;
-  for (const auto& [attribute_name, attribute_value] : attributes) {
-    if (utils::HTTPClient::isValidHttpHeaderField(attribute_name)) {
-      result.emplace(attribute_name, attribute_value);
-    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::TRANSFORM) {
-      result.emplace(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(attribute_name), attribute_value);
-    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::FAIL) {
-      return std::nullopt;
-    }
+/**
+ * Calls append_header with valid HTTP header keys, based on attributes_to_send_
+ * @param flow_file
+ * @param append_header Callback to append HTTP header to the request
+ * @return false when the flow file should be routed to failure, true otherwise
+ */
+bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header) {
+  static_assert(std::is_invocable_v<decltype(append_header), std::string, std::string>);
+  if (!attributes_to_send_) return true;
+  const auto key_fn = [](const std::pair<std::string, std::string>& pair) { return pair.first; };
+  const auto original_attributes = flow_file.getAttributes();
+  // non-const views, because otherwise it doesn't satisfy viewable_range, and transform would fail
+  ranges::viewable_range auto matching_attributes = original_attributes
+      | ranges::views::filter([this](const auto& key) { return utils::regexMatch(key, *attributes_to_send_); }, key_fn);
+  switch (invalid_http_header_field_handling_strategy_.value()) {
+    case InvalidHTTPHeaderFieldHandlingOption::FAIL:
+      if (ranges::any_of(matching_attributes, std::not_fn(&utils::HTTPClient::isValidHttpHeaderField), key_fn)) return false;
+      for (const auto& header: matching_attributes) append_header(header.first, header.second);
+      return true;
+    case InvalidHTTPHeaderFieldHandlingOption::DROP:
+      for (const auto& header: matching_attributes | ranges::views::filter(&utils::HTTPClient::isValidHttpHeaderField, key_fn)) {
+        append_header(header.first, header.second);
+      }
+      return true;
+    case InvalidHTTPHeaderFieldHandlingOption::TRANSFORM:
+      for (const auto& header: matching_attributes) {
+        append_header(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(header.first), header.second);
+      }
+      return true;
   }
-  return result;
+  return true;
 }
 
 void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -358,13 +382,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
     logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
   }
 
-  // append all headers
-  auto attributes_in_headers = validateAttributesAgainstHTTPHeaderRules(flow_file->getAttributes());
-  if (!attributes_in_headers) {
+  const auto append_header = [&client](const std::string& key, const std::string& value) { client.appendHeader(key, value); };
+  if (!appendHeaders(*flow_file, append_header)) {
     session->transfer(flow_file, RelFailure);
     return;
   }
-  client.build_header_list(attribute_to_send_regex_, *attributes_in_headers);
 
   logger_->log_trace("InvokeHTTP -- curl performed");
   if (client.submit()) {
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index bff7895b9..6320536c8 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -33,6 +33,7 @@
 #include "../client/HTTPClient.h"
 #include "utils/Export.h"
 #include "utils/Enum.h"
+#include "utils/RegexUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -104,13 +105,13 @@ class InvokeHTTP : public core::Processor {
   void route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
              const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode);
   bool shouldEmitFlowFile() const;
-  std::optional<std::map<std::string, std::string>> validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const;
+  [[nodiscard]] bool appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header);
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
   std::string method_;
   std::string url_;
   bool date_header_include_{true};
-  std::string attribute_to_send_regex_;
+  std::optional<utils::Regex> attributes_to_send_;
   std::chrono::milliseconds connect_timeout_ms_{20000};
   std::chrono::milliseconds read_timeout_ms_{20000};
   // attribute in which response body will be added
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 032e5cee7..85091446a 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -315,6 +315,7 @@ TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in H
   invokehttp->setProperty(InvokeHTTP::Method, "GET");
   invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
   test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
   const auto result = test_controller.trigger();
@@ -323,6 +324,30 @@ TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in H
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
 }
 
+TEST_CASE("InvokeHTTP succeeds when the flow file contains an attribute that would be invalid as an HTTP header, and the policy is FAIL, but the attribute is not matched",
+    "[httptest1][invokehttp][httpheader][attribute]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  LogTestController::getInstance().setDebug<InvokeHTTP>();
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, "valid.*");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"valid-header", "value2"}});
+  const auto result = test_controller.trigger();
+  REQUIRE(result.at(InvokeHTTP::RelFailure).empty());
+  const auto& success_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(success_contents.size() == 1);
+  http_server.trigger();
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid"));
+  REQUIRE(LogTestController::getInstance().contains("key:valid-header value:value2"));
+}
+
 TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") {
   using minifi::processors::InvokeHTTP;
   TestHTTPServer http_server;
@@ -333,6 +358,7 @@ TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]")
 
   invokehttp->setProperty(InvokeHTTP::Method, "GET");
   invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
   test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
   const auto result = test_controller.trigger();
@@ -355,6 +381,7 @@ TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]"
   invokehttp->setProperty(InvokeHTTP::Method, "GET");
   invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
   test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
   const auto result = test_controller.trigger();
@@ -366,4 +393,50 @@ TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]"
   REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
 }
 
+TEST_CASE("InvokeHTTP empty Attributes to Send means no attributes are sent", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, "");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:legit-header value:value1"));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
+}
+
+TEST_CASE("InvokeHTTP Attributes to Send uses full string matching, not substring", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+  invokehttp->setProperty(InvokeHTTP::AttributesToSend, "he.*er");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"header1", "value1"}, {"header", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:header1 value:value1"));
+  REQUIRE(LogTestController::getInstance().contains("key:header value:value2"));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
+}
 }  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/include/utils/OptionalUtils.h b/libminifi/include/utils/OptionalUtils.h
index 05726168d..9f0b92acd 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -113,6 +113,18 @@ auto operator|(std::optional<SourceType> o, value_or_else_wrapper<F> f) noexcept
     return std::invoke(std::forward<F>(f.function));
   }
 }
+
+// filter implementation
+template<typename SourceType, typename F>
+requires std::is_convertible_v<std::invoke_result_t<F, SourceType>, bool>
+auto operator|(std::optional<SourceType> o, filter_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
+    -> std::optional<SourceType> {
+  if (o && std::invoke(std::forward<F>(f.function), *o)) {
+    return o;
+  } else {
+    return std::nullopt;
+  }
+}
 }  // namespace detail
 }  // namespace org::apache::nifi::minifi::utils
 
diff --git a/libminifi/include/utils/RegexUtils.h b/libminifi/include/utils/RegexUtils.h
index f0b7cdf0f..ff8b95238 100644
--- a/libminifi/include/utils/RegexUtils.h
+++ b/libminifi/include/utils/RegexUtils.h
@@ -90,11 +90,11 @@ class SMatch {
       if (match.rm_so == -1) {
         return "";
       }
-      return std::string(pattern.begin() + match.rm_so, pattern.begin() + match.rm_eo);
+      return std::string(string.begin() + match.rm_so, string.begin() + match.rm_eo);
     }
 
     regmatch_t match;
-    std::string_view pattern;
+    std::string_view string;
   };
 
   struct SuffixWrapper {
@@ -112,11 +112,11 @@ class SMatch {
   void clear();
 
   std::vector<Regmatch> matches_;
-  std::string pattern_;
+  std::string string_;
 
-  friend bool regexMatch(const std::string &pattern, SMatch& match, const Regex& regex);
-  friend bool regexSearch(const std::string &pattern, SMatch& match, const Regex& regex);
-  friend utils::SMatch getLastRegexMatch(const std::string& str, const utils::Regex& pattern);
+  friend bool regexMatch(const std::string& string, SMatch& match, const Regex& regex);
+  friend bool regexSearch(const std::string& string, SMatch& match, const Regex& regex);
+  friend utils::SMatch getLastRegexMatch(const std::string& string, const utils::Regex& pattern);
 };
 #endif
 
@@ -149,25 +149,25 @@ class Regex {
   int regex_mode_;
 #endif
 
-  friend bool regexMatch(const std::string &pattern, const Regex& regex);
-  friend bool regexMatch(const std::string &pattern, SMatch& match, const Regex& regex);
-  friend bool regexSearch(const std::string &pattern, const Regex& regex);
-  friend bool regexSearch(const std::string &pattern, SMatch& match, const Regex& regex);
-  friend SMatch getLastRegexMatch(const std::string& pattern, const utils::Regex& regex);
+  friend bool regexMatch(const std::string &string, const Regex& regex);
+  friend bool regexMatch(const std::string &string, SMatch& match, const Regex& regex);
+  friend bool regexSearch(const std::string &string, const Regex& regex);
+  friend bool regexSearch(const std::string &string, SMatch& match, const Regex& regex);
+  friend SMatch getLastRegexMatch(const std::string& string, const utils::Regex& regex);
 };
 
-bool regexMatch(const std::string &pattern, const Regex& regex);
-bool regexMatch(const std::string &pattern, SMatch& match, const Regex& regex);
+bool regexMatch(const std::string &string, const Regex& regex);
+bool regexMatch(const std::string &string, SMatch& match, const Regex& regex);
 
-bool regexSearch(const std::string &pattern, const Regex& regex);
-bool regexSearch(const std::string &pattern, SMatch& match, const Regex& regex);
+bool regexSearch(const std::string &string, const Regex& regex);
+bool regexSearch(const std::string &string, SMatch& match, const Regex& regex);
 
 /**
  * Returns the last match of a regular expression within the given string
- * @param pattern incoming string
+ * @param string incoming string
  * @param regex the regex to be matched
  * @return the last valid SMatch or a default constructed SMatch (ready() != true) if no matches have been found
  */
-SMatch getLastRegexMatch(const std::string& pattern, const utils::Regex& regex);
+SMatch getLastRegexMatch(const std::string& string, const utils::Regex& regex);
 
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h b/libminifi/include/utils/detail/MonadicOperationWrappers.h
index 8950eb81e..6548ef4ed 100644
--- a/libminifi/include/utils/detail/MonadicOperationWrappers.h
+++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h
@@ -40,6 +40,10 @@ struct value_or_else_wrapper {
   T function;
 };
 
+template<typename T>
+struct filter_wrapper {
+  T function;
+};
 }  // namespace detail
 
 template<typename T>
@@ -53,4 +57,7 @@ detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return {std::forward<T>
 
 template<typename T>
 detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) noexcept { return {std::forward<T>(func)}; }
+
+template<typename T>
+detail::filter_wrapper<T&&> filter(T&& func) noexcept { return {std::forward<T>(func)}; }
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 130c4cb43..062a13b3e 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -93,7 +93,7 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt
     return true;
   }
   if (processor->isThrottledByBackpressure()) {
-    logger_->log_debug("backpressure applied because too much outgoing for %s", processor->getUUIDStr());
+    logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName());
     // need to apply backpressure
     return true;
   }
diff --git a/libminifi/src/utils/RegexUtils.cpp b/libminifi/src/utils/RegexUtils.cpp
index 0b9d9d38f..39319a2f0 100644
--- a/libminifi/src/utils/RegexUtils.cpp
+++ b/libminifi/src/utils/RegexUtils.cpp
@@ -37,10 +37,10 @@ namespace org::apache::nifi::minifi::utils {
 
 #ifndef NO_MORE_REGFREEE
 SMatch::SuffixWrapper SMatch::suffix() const {
-  if ((size_t) matches_[0].match.rm_eo >= pattern_.size()) {
+  if ((size_t) matches_[0].match.rm_eo >= string_.size()) {
     return SuffixWrapper{std::string()};
   } else {
-    return SuffixWrapper{pattern_.substr(matches_[0].match.rm_eo)};
+    return SuffixWrapper{string_.substr(matches_[0].match.rm_eo)};
   }
 }
 
@@ -73,7 +73,7 @@ std::size_t SMatch::length(std::size_t index) const {
 
 void SMatch::clear() {
   matches_.clear();
-  pattern_.clear();
+  string_.clear();
 }
 #endif
 
@@ -198,73 +198,73 @@ void Regex::compileRegex(regex_t& regex, const std::string& regex_string) const
 }
 #endif
 
-bool regexSearch(const std::string &pattern, const Regex& regex) {
+bool regexSearch(const std::string &string, const Regex& regex) {
   if (!regex.valid_) {
     return false;
   }
 #ifdef NO_MORE_REGFREEE
-  return std::regex_search(pattern, regex.compiled_regex_);
+  return std::regex_search(string, regex.compiled_regex_);
 #else
   std::vector<regmatch_t> match;
   match.resize(getMaxGroupCountOfRegex(regex.regex_str_));
-  return regexec(&regex.compiled_regex_, pattern.c_str(), match.size(), match.data(), 0) == 0;
+  return regexec(&regex.compiled_regex_, string.c_str(), match.size(), match.data(), 0) == 0;
 #endif
 }
 
-bool regexSearch(const std::string &pattern, SMatch& match, const Regex& regex) {
+bool regexSearch(const std::string &string, SMatch& match, const Regex& regex) {
   if (!regex.valid_) {
     return false;
   }
 #ifdef NO_MORE_REGFREEE
-  return std::regex_search(pattern, match, regex.compiled_regex_);
+  return std::regex_search(string, match, regex.compiled_regex_);
 #else
   match.clear();
   std::vector<regmatch_t> regmatches;
   regmatches.resize(getMaxGroupCountOfRegex(regex.regex_str_));
-  bool result = regexec(&regex.compiled_regex_, pattern.c_str(), regmatches.size(), regmatches.data(), 0) == 0;
-  match.pattern_ = pattern;
+  bool result = regexec(&regex.compiled_regex_, string.c_str(), regmatches.size(), regmatches.data(), 0) == 0;
+  match.string_ = string;
   for (const auto& regmatch : regmatches) {
-    match.matches_.push_back(SMatch::Regmatch{regmatch, match.pattern_});
+    match.matches_.push_back(SMatch::Regmatch{regmatch, match.string_});
   }
   return result;
 #endif
 }
 
-bool regexMatch(const std::string &pattern, const Regex& regex) {
+bool regexMatch(const std::string &string, const Regex& regex) {
   if (!regex.valid_) {
     return false;
   }
 #ifdef NO_MORE_REGFREEE
-  return std::regex_match(pattern, regex.compiled_regex_);
+  return std::regex_match(string, regex.compiled_regex_);
 #else
   std::vector<regmatch_t> match;
   match.resize(getMaxGroupCountOfRegex(regex.regex_str_));
-  return regexec(&regex.compiled_full_input_regex_, pattern.c_str(), match.size(), match.data(), 0) == 0;
+  return regexec(&regex.compiled_full_input_regex_, string.c_str(), match.size(), match.data(), 0) == 0;
 #endif
 }
 
-bool regexMatch(const std::string &pattern, SMatch& match, const Regex& regex) {
+bool regexMatch(const std::string &string, SMatch& match, const Regex& regex) {
   if (!regex.valid_) {
     return false;
   }
 #ifdef NO_MORE_REGFREEE
-  return std::regex_match(pattern, match, regex.compiled_regex_);
+  return std::regex_match(string, match, regex.compiled_regex_);
 #else
   match.clear();
   std::vector<regmatch_t> regmatches;
   regmatches.resize(getMaxGroupCountOfRegex(regex.regex_str_));
-  bool result = regexec(&regex.compiled_full_input_regex_, pattern.c_str(), regmatches.size(), regmatches.data(), 0) == 0;
-  match.pattern_ = pattern;
+  bool result = regexec(&regex.compiled_full_input_regex_, string.c_str(), regmatches.size(), regmatches.data(), 0) == 0;
+  match.string_ = string;
   for (const auto& regmatch : regmatches) {
-    match.matches_.push_back(SMatch::Regmatch{regmatch, match.pattern_});
+    match.matches_.push_back(SMatch::Regmatch{regmatch, match.string_});
   }
   return result;
 #endif
 }
 
-SMatch getLastRegexMatch(const std::string& pattern, const utils::Regex& regex) {
+SMatch getLastRegexMatch(const std::string& string, const utils::Regex& regex) {
 #ifdef NO_MORE_REGFREEE
-  auto matches = std::sregex_iterator(pattern.begin(), pattern.end(), regex.compiled_regex_);
+  auto matches = std::sregex_iterator(string.begin(), string.end(), regex.compiled_regex_);
   std::smatch last_match;
   while (matches != std::sregex_iterator()) {
     last_match = *matches;
@@ -274,14 +274,14 @@ SMatch getLastRegexMatch(const std::string& pattern, const utils::Regex& regex)
 #else
   SMatch search_result;
   SMatch last_match;
-  auto current_str = pattern;
+  auto current_str = string;
   while (regexSearch(current_str, search_result, regex)) {
     last_match = search_result;
     current_str = search_result.suffix();
   }
 
-  auto diff = pattern.size() - last_match.pattern_.size();
-  last_match.pattern_ = pattern;
+  auto diff = string.size() - last_match.string_.size();
+  last_match.string_ = string;
   for (auto& match : last_match.matches_) {
     if (match.match.rm_so >= 0) {
       match.match.rm_so += diff;
diff --git a/libminifi/test/unit/OptionalTest.cpp b/libminifi/test/unit/OptionalTest.cpp
index 59ebd80a9..93dd31879 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -72,3 +72,8 @@ TEST_CASE("optional valueOrElse", "[optional][valueOrElse]") {
   REQUIRE(0 == test2);
   REQUIRE_THROWS_AS(std::optional<int>{} | utils::valueOrElse([]() -> int { throw std::exception{}; }), std::exception);
 }
+
+TEST_CASE("optional filter", "[optional][filter]") {
+  REQUIRE(7 == (std::make_optional(7) | utils::filter([](int i) { return i % 2 == 1; })).value());
+  REQUIRE(std::nullopt == (std::make_optional(8) | utils::filter([](int i) { return i % 2 == 1; })));
+}