You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:01 UTC
[nifi-minifi-cpp] 02/04: MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9cc12971c94d6d0958ecdd2da8a63e004aa01e8c
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:15:45 2022 +0200
MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP
Closes #1321
Signed-off-by: Marton Szasz <sz...@apache.org>
---
PROCESSORS.md | 1 +
extensions/http-curl/client/HTTPClient.cpp | 33 ++++
extensions/http-curl/client/HTTPClient.h | 4 +
extensions/http-curl/processors/InvokeHTTP.cpp | 211 +++++++++++----------
extensions/http-curl/processors/InvokeHTTP.h | 76 ++------
.../http-curl/tests/unit/HTTPClientTests.cpp | 18 ++
.../http-curl/tests/unit/InvokeHTTPTests.cpp | 92 ++++++++-
7 files changed, 269 insertions(+), 166 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index d97f8752d..88a074211 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -924,6 +924,7 @@ In the list below, the names of required properties appear in bold. Any other pr
|Follow Redirects|true||Follow HTTP redirects issued by remote server.|
|HTTP Method|GET||HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.|
|Include Date Header|true||Include an RFC-2616 Date header in the request.|
+|**Invalid HTTP Header Field Handling Strategy**|transform|transform<br/>fail<br/>drop|Indicates what should happen when an attribute's name is not a valid HTTP header field name.<br/>Options:<br/>transform - invalid characters are replaced<br/>fail - flow file is transferred to failure<br/>drop - drops invalid attributes from HTTP message|
|invokehttp-proxy-password|||Password to set when authenticating against proxy|
|invokehttp-proxy-username|||Username to set when authenticating against proxy|
|Penalize on "No Retry"|false||Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship.|
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 16298aa24..eeec4194f 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -455,6 +455,39 @@ void HTTPClient::setFollowRedirects(bool follow) {
curl_easy_setopt(http_session_, CURLOPT_FOLLOWLOCATION, follow);
}
+bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
+ if (field_name.size() == 0) {
+ return false;
+ }
+
+ // RFC822 3.1.2: The field-name must be composed of printable ASCII characters
+ // (i.e., characters that have values between 33. and 126., decimal, except colon).
+ for (auto ch : field_name) {
+ if (ch < 33 || ch > 126 || ch == ':') {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name) {
+ if (field_name.size() == 0) {
+ return "X-MiNiFi-Empty-Attribute-Name";
+ }
+
+ std::string result;
+ // RFC822 3.1.2: The field-name must be composed of printable ASCII characters
+ // (i.e., characters that have values between 33. and 126., decimal, except colon).
+ for (auto ch : field_name) {
+ if (ch < 33 || ch > 126 || ch == ':') {
+ result += '-';
+ } else {
+ result += ch;
+ }
+ }
+ return result;
+}
+
REGISTER_INTERNAL_RESOURCE(HTTPClient);
} // namespace utils
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 05a24e144..7159895c2 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -48,6 +48,7 @@
#else
#include <regex.h>
#endif
+#include <string_view>
#include "utils/ByteArrayCallback.h"
#include "controllers/SSLContextService.h"
@@ -238,6 +239,9 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
}
}
+ static bool isValidHttpHeaderField(std::string_view field_name);
+ static std::string replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name);
+
private:
static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 3a4516883..aea10f730 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -17,11 +17,7 @@
*/
#include "InvokeHTTP.h"
-#ifdef WIN32
-#include <regex>
-#else
-#include <regex.h>
-#endif
+
#include <memory>
#include <cinttypes>
#include <cstdint>
@@ -39,14 +35,10 @@
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
-const char *InvokeHTTP::ProcessorName = "InvokeHTTP";
std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
@@ -115,6 +107,16 @@ core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will
core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false");
+
+core::Property InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy(
+ core::PropertyBuilder::createProperty("Invalid HTTP Header Field Handling Strategy")
+ ->withDescription("Indicates what should happen when an attribute's name is not a valid HTTP header field name. "
+ "Options: transform - invalid characters are replaced, fail - flow file is transferred to failure, drop - drops invalid attributes from HTTP message")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(toString(InvalidHTTPHeaderFieldHandlingOption::TRANSFORM))
+ ->withAllowableValues<std::string>(InvalidHTTPHeaderFieldHandlingOption::values())
+ ->build());
+
const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body";
@@ -142,32 +144,29 @@ core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will
void InvokeHTTP::initialize() {
logger_->log_trace("Initializing InvokeHTTP");
-
- // Set the supported properties
- std::set<core::Property> properties;
- properties.insert(Method);
- properties.insert(URL);
- properties.insert(ConnectTimeout);
- properties.insert(ReadTimeout);
- properties.insert(DateHeader);
- properties.insert(AttributesToSend);
- properties.insert(SSLContext);
- properties.insert(ProxyHost);
- properties.insert(ProxyPort);
- properties.insert(ProxyUsername);
- properties.insert(ProxyPassword);
- properties.insert(UseChunkedEncoding);
- properties.insert(ContentType);
- properties.insert(SendBody);
- properties.insert(SendMessageBody);
- properties.insert(DisablePeerVerification);
- properties.insert(AlwaysOutputResponse);
- properties.insert(FollowRedirects);
- properties.insert(PropPutOutputAttributes);
- properties.insert(PenalizeOnNoRetry);
-
- setSupportedProperties(properties);
- // Set the supported relationships
+ setSupportedProperties({
+ Method,
+ URL,
+ ConnectTimeout,
+ ReadTimeout,
+ DateHeader,
+ AttributesToSend,
+ SSLContext,
+ ProxyHost,
+ ProxyPort,
+ ProxyUsername,
+ ProxyPassword,
+ UseChunkedEncoding,
+ ContentType,
+ SendBody,
+ SendMessageBody,
+ DisablePeerVerification,
+ AlwaysOutputResponse,
+ FollowRedirects,
+ PropPutOutputAttributes,
+ PenalizeOnNoRetry,
+ InvalidHTTPHeaderFieldHandlingStrategy
+ });
setSupportedRelationships({Success, RelResponse, RelFailure, RelRetry, RelNoRetry});
}
@@ -182,7 +181,6 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
return;
}
-
if (auto connect_timeout = context->getProperty<core::TimePeriodValue>(ConnectTimeout)) {
connect_timeout_ms_ = connect_timeout->getMilliseconds();
} else {
@@ -190,9 +188,9 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
return;
}
- std::string contentTypeStr;
- if (context->getProperty(ContentType.getName(), contentTypeStr)) {
- content_type_ = contentTypeStr;
+ std::string content_type_str;
+ if (context->getProperty(ContentType.getName(), content_type_str)) {
+ content_type_ = content_type_str;
}
if (auto read_timeout = context->getProperty<core::TimePeriodValue>(ReadTimeout)) {
@@ -201,12 +199,12 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
logger_->log_debug("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName(), ReadTimeout.getValue());
}
- std::string dateHeaderStr;
- if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
+ std::string date_header_str;
+ if (!context->getProperty(DateHeader.getName(), date_header_str)) {
logger_->log_debug("%s attribute is missing, so default value of %s will be used", DateHeader.getName(), DateHeader.getValue());
}
- date_header_include_ = utils::StringUtils::toBool(dateHeaderStr).value_or(DateHeader.getValue());
+ date_header_include_ = utils::StringUtils::toBool(date_header_str).value_or(DateHeader.getValue());
if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
logger_->log_debug("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName(), PropPutOutputAttributes.getValue());
@@ -238,15 +236,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
}
}
- std::string useChunkedEncoding = "false";
- if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) {
+ std::string use_chunked_encoding = "false";
+ if (!context->getProperty(UseChunkedEncoding.getName(), use_chunked_encoding)) {
logger_->log_debug("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName(), UseChunkedEncoding.getValue());
}
- use_chunked_encoding_ = utils::StringUtils::toBool(useChunkedEncoding).value_or(false);
+ use_chunked_encoding_ = utils::StringUtils::toBool(use_chunked_encoding).value_or(false);
- std::string disablePeerVerification;
- disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification) && utils::StringUtils::toBool(disablePeerVerification).value_or(false));
+ std::string disable_peer_verification;
+ disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disable_peer_verification) && utils::StringUtils::toBool(disable_peer_verification).value_or(false));
proxy_ = {};
context->getProperty(ProxyHost.getName(), proxy_.host);
@@ -258,27 +256,35 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
context->getProperty(ProxyPassword.getName(), proxy_.password);
context->getProperty(FollowRedirects.getName(), follow_redirects_);
context->getProperty(SendMessageBody.getName(), send_body_);
-}
-InvokeHTTP::~InvokeHTTP() = default;
+ invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<InvalidHTTPHeaderFieldHandlingOption>(*context, InvalidHTTPHeaderFieldHandlingStrategy);
+}
-std::string InvokeHTTP::generateId() {
- return utils::IdGenerator::getIdGenerator()->generate().to_string();
+bool InvokeHTTP::shouldEmitFlowFile() const {
+ return ("POST" == method_ || "PUT" == method_ || "PATCH" == method_);
}
-bool InvokeHTTP::emitFlowFile(const std::string &method) {
- return ("POST" == method || "PUT" == method || "PATCH" == method);
+std::optional<std::map<std::string, std::string>> InvokeHTTP::validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const {
+ std::map<std::string, std::string> result;
+ for (const auto& [attribute_name, attribute_value] : attributes) {
+ if (utils::HTTPClient::isValidHttpHeaderField(attribute_name)) {
+ result.emplace(attribute_name, attribute_value);
+ } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::TRANSFORM) {
+ result.emplace(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(attribute_name), attribute_value);
+ } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::FAIL) {
+ return std::nullopt;
+ }
+ }
+ return result;
}
void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- auto flowFile = session->get();
+ auto flow_file = session->get();
- std::string url = url_;
-
- if (flowFile == nullptr) {
- if (!emitFlowFile(method_)) {
+ if (flow_file == nullptr) {
+ if (!shouldEmitFlowFile()) {
logger_->log_debug("InvokeHTTP -- create flow file with %s", method_);
- flowFile = session->create();
+ flow_file = session->create();
} else {
logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", method_);
yield();
@@ -291,11 +297,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
logger_->log_debug("onTrigger InvokeHTTP with %s to %s", method_, url_);
// create a transaction id
- std::string tx_id = generateId();
+ std::string tx_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
- // Note: callback must be declared before callbackObj so that they are destructed in the correct order
- std::unique_ptr<utils::ByteInputCallback> callback = nullptr;
- std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
+ // Note: callback must be declared before callback_obj so that they are destructed in the correct order
+ std::unique_ptr<utils::ByteInputCallback> callback;
+ std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
// Client declared after the callbacks to make sure the callbacks are still available when the client is destructed
utils::HTTPClient client(url_, ssl_context_service_);
@@ -320,25 +326,25 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
client.setHTTPProxy(proxy_);
- if (emitFlowFile(method_)) {
+ if (shouldEmitFlowFile()) {
logger_->log_trace("InvokeHTTP -- reading flowfile");
- std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
+ std::shared_ptr<ResourceClaim> claim = flow_file->getResourceClaim();
if (claim) {
callback = std::make_unique<utils::ByteInputCallback>();
if (send_body_) {
- session->read(flowFile, std::ref(*callback));
+ session->read(flow_file, std::ref(*callback));
}
- callbackObj = std::make_unique<utils::HTTPUploadCallback>();
- callbackObj->ptr = callback.get();
- callbackObj->pos = 0;
+ callback_obj = std::make_unique<utils::HTTPUploadCallback>();
+ callback_obj->ptr = callback.get();
+ callback_obj->pos = 0;
logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
if (!send_body_) {
client.appendHeader("Content-Length", "0");
} else if (!use_chunked_encoding_) {
- client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
+ client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
}
- client.setUploadCallback(callbackObj.get());
- client.setSeekFunction(callbackObj.get());
+ client.setUploadCallback(callback_obj.get());
+ client.setSeekFunction(callback_obj.get());
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
@@ -348,14 +354,19 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
}
// append all headers
- client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes());
+ auto attributes_in_headers = validateAttributesAgainstHTTPHeaderRules(flow_file->getAttributes());
+ if (!attributes_in_headers) {
+ session->transfer(flow_file, RelFailure);
+ return;
+ }
+ client.build_header_list(attribute_to_send_regex_, *attributes_in_headers);
logger_->log_trace("InvokeHTTP -- curl performed");
if (client.submit()) {
logger_->log_trace("InvokeHTTP -- curl successful");
- bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
- if (putToAttribute) {
+ bool put_to_attribute = !IsNullOrEmpty(put_attribute_name_);
+ if (put_to_attribute) {
logger_->log_debug("Adding http response body to flow file attribute %s", put_attribute_name_);
}
@@ -364,21 +375,21 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
int64_t http_code = client.getResponseCode();
const char *content_type = client.getContentType();
- flowFile->addAttribute(STATUS_CODE, std::to_string(http_code));
+ flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty())
- flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0));
- flowFile->addAttribute(REQUEST_URL, url_);
- flowFile->addAttribute(TRANSACTION_ID, tx_id);
+ flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0));
+ flow_file->addAttribute(REQUEST_URL, url_);
+ flow_file->addAttribute(TRANSACTION_ID, tx_id);
- bool isSuccess = (static_cast<int32_t>(http_code / 100) == 2);
- bool output_body_to_content = isSuccess && !putToAttribute;
+ bool is_success = (static_cast<int32_t>(http_code / 100) == 2);
+ bool output_body_to_content = is_success && !put_to_attribute;
- logger_->log_debug("isSuccess: %d, response code %" PRId64, isSuccess, http_code);
+ logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
std::shared_ptr<core::FlowFile> response_flow = nullptr;
if (output_body_to_content) {
- if (flowFile != nullptr) {
- response_flow = session->create(flowFile);
+ if (flow_file != nullptr) {
+ response_flow = session->create(flow_file);
} else {
response_flow = session->create();
}
@@ -389,47 +400,47 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty())
response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0));
- response_flow->addAttribute(REQUEST_URL, url);
+ response_flow->addAttribute(REQUEST_URL, url_);
response_flow->addAttribute(TRANSACTION_ID, tx_id);
io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
// need an import from the data stream.
session->importFrom(stream, response_flow);
}
- route(flowFile, response_flow, session, context, isSuccess, http_code);
+ route(flow_file, response_flow, session, context, is_success, http_code);
} else {
- session->penalize(flowFile);
- session->transfer(flowFile, RelFailure);
+ session->penalize(flow_file);
+ session->transfer(flow_file, RelFailure);
}
}
void InvokeHTTP::route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
- const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode) {
+ const std::shared_ptr<core::ProcessContext> &context, bool is_success, int64_t status_code) {
// check if we should yield the processor
- if (!isSuccess && request == nullptr) {
+ if (!is_success && request == nullptr) {
context->yield();
}
// If the property to output the response flowfile regardless of status code is set then transfer it
- bool responseSent = false;
+ bool response_sent = false;
if (always_output_response_ && response != nullptr) {
logger_->log_debug("Outputting success and response");
session->transfer(response, RelResponse);
- responseSent = true;
+ response_sent = true;
}
// transfer to the correct relationship
// 2xx -> SUCCESS
- if (isSuccess) {
+ if (is_success) {
// we have two flowfiles to transfer
if (request != nullptr) {
session->transfer(request, Success);
}
- if (response != nullptr && !responseSent) {
+ if (response != nullptr && !response_sent) {
logger_->log_debug("Outputting success and response");
session->transfer(response, RelResponse);
}
// 5xx -> RETRY
- } else if (statusCode / 100 == 5) {
+ } else if (status_code / 100 == 5) {
if (request != nullptr) {
session->penalize(request);
session->transfer(request, RelRetry);
@@ -449,8 +460,4 @@ REGISTER_RESOURCE(InvokeHTTP, "An HTTP client processor which can interact with
"The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the "
"FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).");
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 08c89c604..bff7895b9 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -21,6 +21,7 @@
#include <curl/curl.h>
#include <memory>
#include <string>
+#include <map>
#include "FlowFileRecord.h"
#include "core/Processor.h"
@@ -31,30 +32,24 @@
#include "utils/Id.h"
#include "../client/HTTPClient.h"
#include "utils/Export.h"
+#include "utils/Enum.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
-// InvokeHTTP Class
class InvokeHTTP : public core::Processor {
public:
- // Constructor
- /*!
- * Create a new processor
- */
+ SMART_ENUM(InvalidHTTPHeaderFieldHandlingOption,
+ (FAIL, "fail"),
+ (TRANSFORM, "transform"),
+ (DROP, "drop")
+ )
+
explicit InvokeHTTP(const std::string& name, const utils::Identifier& uuid = {})
: Processor(name, uuid) {
setTriggerWhenEmpty(true);
}
- // Destructor
- virtual ~InvokeHTTP();
- // Processor Name
- EXTENSIONAPI static const char *ProcessorName;
EXTENSIONAPI static std::string DefaultContentType;
- // Supported Properties
+
EXTENSIONAPI static core::Property Method;
EXTENSIONAPI static core::Property URL;
EXTENSIONAPI static core::Property ConnectTimeout;
@@ -73,10 +68,9 @@ class InvokeHTTP : public core::Processor {
EXTENSIONAPI static core::Property UseChunkedEncoding;
EXTENSIONAPI static core::Property DisablePeerVerification;
EXTENSIONAPI static core::Property PropPutOutputAttributes;
-
EXTENSIONAPI static core::Property AlwaysOutputResponse;
-
EXTENSIONAPI static core::Property PenalizeOnNoRetry;
+ EXTENSIONAPI static core::Property InvalidHTTPHeaderFieldHandlingStrategy;
EXTENSIONAPI static const char* STATUS_CODE;
EXTENSIONAPI static const char* STATUS_MESSAGE;
@@ -86,7 +80,7 @@ class InvokeHTTP : public core::Processor {
EXTENSIONAPI static const char* REMOTE_DN;
EXTENSIONAPI static const char* EXCEPTION_CLASS;
EXTENSIONAPI static const char* EXCEPTION_MESSAGE;
- // Supported Relationships
+
EXTENSIONAPI static core::Relationship Success;
EXTENSIONAPI static core::Relationship RelResponse;
EXTENSIONAPI static core::Relationship RelRetry;
@@ -96,20 +90,8 @@ class InvokeHTTP : public core::Processor {
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- /**
- * Provides a reference to the URL.
- */
- const std::string &getUrl() {
- return url_;
- }
-
- protected:
- /**
- * Generate a transaction ID
- * @return transaction ID string.
- */
- std::string generateId();
+ private:
/**
* Routes the flowfile to the proper destination
* @param request request flow file record
@@ -121,49 +103,29 @@ class InvokeHTTP : public core::Processor {
*/
void route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode);
- /**
- * Determine if we should emit a new flowfile based on our activity
- * @param method method type
- * @return result of the evaluation.
- */
- bool emitFlowFile(const std::string &method);
-
- std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_{nullptr};
+ bool shouldEmitFlowFile() const;
+ std::optional<std::map<std::string, std::string>> validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const;
- // http method
+ std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
std::string method_;
- // url
std::string url_;
- // include date in the header
bool date_header_include_{true};
- // attribute to send regex
std::string attribute_to_send_regex_;
- // connection timeout
std::chrono::milliseconds connect_timeout_ms_{20000};
- // read timeout.
std::chrono::milliseconds read_timeout_ms_{20000};
// attribute in which response body will be added
std::string put_attribute_name_;
- // determine if we always output a response.
bool always_output_response_{false};
- // content type.
std::string content_type_;
- // use chunked encoding.
bool use_chunked_encoding_{false};
- // penalize on no retry
bool penalize_no_retry_{false};
- // disable peer verification ( makes susceptible for MITM attacks )
+ // disabling peer verification makes susceptible for MITM attacks
bool disable_peer_verification_{false};
utils::HTTPProxy proxy_;
bool follow_redirects_{true};
bool send_body_{true};
-
- private:
+ InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger()};
};
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/tests/unit/HTTPClientTests.cpp b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
index ac6d78ffe..1101b4ef4 100644
--- a/extensions/http-curl/tests/unit/HTTPClientTests.cpp
+++ b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
@@ -95,3 +95,21 @@ TEST_CASE("HTTPClient escape test") {
CHECK(client.escape("Hello Günter") == "Hello%20G%C3%BCnter");
CHECK(client.escape("шеллы") == "%D1%88%D0%B5%D0%BB%D0%BB%D1%8B");
}
+
+TEST_CASE("HTTPClient isValidHttpHeaderField test") {
+ CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(""));
+ CHECK(utils::HTTPClient::isValidHttpHeaderField("valid"));
+ CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(" "));
+ CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(11) + "character"));
+ CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(128) + "character"));
+ CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField("contains:invalid"));
+}
+
+TEST_CASE("HTTPClient replaceInvalidCharactersInHttpHeaderFieldName test") {
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("") == "X-MiNiFi-Empty-Attribute-Name");
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("valid") == "valid");
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(" ") == "-");
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(11) + "character") == "invalid-character");
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(128) + "character") == "invalid-character");
+ CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("contains:invalid") == "contains-invalid");
+}
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index b8213c53b..032e5cee7 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -34,27 +34,41 @@
#include "core/ProcessorNode.h"
#include "processors/LogAttribute.h"
#include "utils/gsl.h"
-#include "processors/GenerateFlowFile.h"
+#include "SingleProcessorTestController.h"
+
+namespace org::apache::nifi::minifi::test {
-namespace {
class TestHTTPServer {
public:
TestHTTPServer();
static constexpr const char* PROCESSOR_NAME = "my_http_server";
static constexpr const char* URL = "http://localhost:8681/testytesttest";
+ void trigger() {
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+ test_plan_->reset();
+ test_controller_.runSession(test_plan_);
+ }
+
private:
TestController test_controller_;
+ std::shared_ptr<core::Processor> listen_http_;
+ std::shared_ptr<core::Processor> log_attribute_;
std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
};
TestHTTPServer::TestHTTPServer() {
- std::shared_ptr<core::Processor> listen_http = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
- test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest");
- test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+
+ listen_http_ = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
+ log_attribute_ = test_plan_->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true);
+ test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "testytesttest");
+ test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+ test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex.getName(), ".*");
test_controller_.runSession(test_plan_);
}
-} // namespace
TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
TestController testController;
@@ -251,7 +265,7 @@ TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
- plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+ plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), "http://localhost:8681/invalid");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
constexpr const char* PENALIZE_LOG_PATTERN = "Penalizing [0-9a-f-]+ for [0-9]+ms at invokehttp";
@@ -289,3 +303,67 @@ TEST_CASE("HTTPTestsPutResponseBodyinAttribute", "[httptest1]") {
REQUIRE(LogTestController::getInstance().contains("Adding http response body to flow file attribute http.type"));
}
+
+TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in HTTP headers", "[httptest1]") {
+ using minifi::processors::InvokeHTTP;
+ TestHTTPServer http_server;
+
+ LogTestController::getInstance().setDebug<InvokeHTTP>();
+ auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+ test::SingleProcessorTestController test_controller{invokehttp};
+
+ invokehttp->setProperty(InvokeHTTP::Method, "GET");
+ invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+ invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
+ invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+ test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
+ const auto result = test_controller.trigger();
+ auto file_contents = result.at(InvokeHTTP::RelFailure);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+}
+
+TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") {
+ using minifi::processors::InvokeHTTP;
+ TestHTTPServer http_server;
+
+ auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+ test::SingleProcessorTestController test_controller{invokehttp};
+ LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+ invokehttp->setProperty(InvokeHTTP::Method, "GET");
+ invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+ invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+ test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
+ const auto result = test_controller.trigger();
+ auto file_contents = result.at(InvokeHTTP::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+ http_server.trigger();
+ REQUIRE(LogTestController::getInstance().contains("key:invalid-header value:value"));
+ REQUIRE(LogTestController::getInstance().contains("key:X-MiNiFi-Empty-Attribute-Name value:value2"));
+}
+
+TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]") {
+ using minifi::processors::InvokeHTTP;
+ TestHTTPServer http_server;
+
+ auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+ test::SingleProcessorTestController test_controller{invokehttp};
+ LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+ invokehttp->setProperty(InvokeHTTP::Method, "GET");
+ invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+ invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+ invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+ test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
+ const auto result = test_controller.trigger();
+ auto file_contents = result.at(InvokeHTTP::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+ http_server.trigger();
+ REQUIRE(LogTestController::getInstance().contains("key:legit-header value:value1"));
+ REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
+}
+
+} // namespace org::apache::nifi::minifi::test