You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2020/09/07 17:34:40 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1357 Implement and
test proxy handling in InvokeHTTP
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8318305 MINIFICPP-1357 Implement and test proxy handling in InvokeHTTP
8318305 is described below
commit 831830516b63737076a036b422232f68e33335d7
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Sep 7 19:33:22 2020 +0200
MINIFICPP-1357 Implement and test proxy handling in InvokeHTTP
Signed-off-by: Marton Szasz <sz...@gmail.com>
This closes #897
---
docker/test/integration/minifi/__init__.py | 35 +++++++-
docker/test/integration/minifi/test/__init__.py | 11 +++
docker/test/integration/test_http.py | 25 +++++-
extensions/http-curl/client/HTTPClient.cpp | 5 ++
extensions/http-curl/client/HTTPClient.h | 2 +
extensions/http-curl/processors/InvokeHTTP.cpp | 15 +++-
extensions/http-curl/processors/InvokeHTTP.h | 2 +
libminifi/include/utils/ByteArrayCallback.h | 3 +-
libminifi/include/utils/HTTPClient.h | 106 +++++++++++-------------
9 files changed, 141 insertions(+), 63 deletions(-)
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index cfb2eb5..3e55d5b 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -112,6 +112,8 @@ class SingleNodeDockerCluster(Cluster):
self.deploy_minifi_cpp_flow(flow, name, vols)
elif engine == 'kafka-broker':
self.deploy_kafka_broker(name)
+ elif engine == 'http-proxy':
+ self.deploy_http_proxy()
else:
raise Exception('invalid flow engine: \'%s\'' % engine)
@@ -241,6 +243,28 @@ class SingleNodeDockerCluster(Cluster):
self.containers[consumer.name] = consumer
self.containers[broker.name] = broker
+ def deploy_http_proxy(self):
+ logging.info('Creating and running http-proxy docker container...')
+ dockerfile = dedent("""FROM {base_image}
+ RUN apt update && apt install -y apache2-utils
+ RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
+ RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \
+ echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \
+ echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \
+ echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \
+ echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
+ ENTRYPOINT ["/sbin/entrypoint.sh"]
+ """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
+ configured_image = self.build_image(dockerfile, [])
+ consumer = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name='http-proxy',
+ network=self.network.name,
+ ports={'3128/tcp': 3128},
+ )
+ self.containers[consumer.name] = consumer
+
def build_image(self, dockerfile, context_files):
conf_dockerfile_buffer = BytesIO()
docker_context_buffer = BytesIO()
@@ -432,8 +456,17 @@ class Processor(Connectable):
class InvokeHTTP(Processor):
def __init__(self, url,
method='GET',
+ proxy_host='',
+ proxy_port='',
+ proxy_username='',
+ proxy_password='',
ssl_context_service=None):
- properties = {'Remote URL': url, 'HTTP Method': method}
+ properties = {'Remote URL': url,
+ 'HTTP Method': method,
+ 'Proxy Host': proxy_host,
+ 'Proxy Port': proxy_port,
+ 'invokehttp-proxy-username': proxy_username,
+ 'invokehttp-proxy-password': proxy_password}
controller_services = []
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 71fd096..2a4218f 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -17,6 +17,8 @@ import logging
import shutil
import uuid
import tarfile
+import subprocess
+import sys
from io import BytesIO
from threading import Event
@@ -177,6 +179,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
if isinstance(self.output_validator, FileOutputValidator):
return self.output_validator.validate(dir=kwargs.get('dir', ''))
return self.output_validator.validate()
+
+ def check_http_proxy_access(self):
+ output = subprocess.check_output(["docker", "exec", "http-proxy", "cat", "/var/log/squid/access.log"]).decode(sys.stdout.encoding)
+ if "http://minifi-listen:8080/contentListener" in output and \
+ output.count("TCP_DENIED/407") != 0 and \
+ output.count("TCP_MISS/200") == output.count("TCP_DENIED/407"):
+ return True
+ return False
+
def rm_out_child(self, dir):
logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
shutil.rmtree(self.tmp_test_output_dir + dir)
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
index 6b6a9f5..faa4a5a 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -21,7 +21,6 @@ def test_invoke_listen():
"""
Verify sending using InvokeHTTP to a receiver using ListenHTTP.
"""
-
invoke_flow = (GetFile('/tmp/input')
>> LogAttribute()
>> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
@@ -34,3 +33,27 @@ def test_invoke_listen():
cluster.deploy_flow(invoke_flow, name='minifi-invoke')
assert cluster.check_output()
+
+def test_invoke_listen_with_proxy():
+ """
+ Verify sending through a proxy using InvokeHTTP to a receiver using ListenHTTP.
+ """
+ invoke_flow = (GetFile('/tmp/input')
+ >> LogAttribute()
+ >> InvokeHTTP('http://minifi-listen:8080/contentListener',
+ method='POST',
+ proxy_host='http-proxy',
+ proxy_port='3128',
+ proxy_username='admin',
+ proxy_password='test101'))
+
+ listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ cluster.put_test_data('test')
+ cluster.deploy_flow(None, engine='http-proxy')
+ cluster.deploy_flow(listen_flow, name='minifi-listen')
+ cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+ assert cluster.check_output()
+ assert cluster.check_http_proxy_access()
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 9c36693..5d48a9a 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -190,6 +190,11 @@ void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
}
+void HTTPClient::setSeekFunction(HTTPUploadCallback *callbackObj) {
+ curl_easy_setopt(http_session_, CURLOPT_SEEKDATA, static_cast<void*>(callbackObj));
+ curl_easy_setopt(http_session_, CURLOPT_SEEKFUNCTION, &utils::HTTPRequestResponse::seek_callback);
+}
+
struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
if (http_session_) {
for (auto attribute : attributes) {
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 3b1941a..1add721 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -106,6 +106,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
void setUploadCallback(HTTPUploadCallback *callbackObj) override;
+ void setSeekFunction(HTTPUploadCallback *callbackObj) override;
+
virtual void setReadCallback(HTTPReadCallback *callbackObj);
struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 70a4ed2..4dc4744 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -83,7 +83,9 @@ core::Property InvokeHTTP::SSLContext(
"information for TLS/SSL (https) connections.")->isRequired(false)->withExclusiveProperty("Remote URL", "^http:.*$")
->asType<minifi::controllers::SSLContextService>()->build());
core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", "");
-core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", "");
+core::Property InvokeHTTP::ProxyPort(
+ core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
+ ->isRequired(false)->build());
core::Property InvokeHTTP::ProxyUsername(
core::PropertyBuilder::createProperty("invokehttp-proxy-username", "Proxy Username")->withDescription("Username to set when authenticating against proxy")->isRequired(false)->build());
core::Property InvokeHTTP::ProxyPassword(
@@ -145,8 +147,8 @@ void InvokeHTTP::initialize() {
properties.insert(ProxyHost);
properties.insert(ProxyPort);
properties.insert(ProxyUsername);
- properties.insert(UseChunkedEncoding);
properties.insert(ProxyPassword);
+ properties.insert(UseChunkedEncoding);
properties.insert(ContentType);
properties.insert(SendBody);
properties.insert(DisablePeerVerification);
@@ -246,6 +248,12 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
if (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification)) {
utils::StringUtils::StringToBool(disablePeerVerification, disable_peer_verification_);
}
+
+ proxy_ = {};
+ context->getProperty(ProxyHost.getName(), proxy_.host);
+ context->getProperty(ProxyPort.getName(), proxy_.port);
+ context->getProperty(ProxyUsername.getName(), proxy_.username);
+ context->getProperty(ProxyPassword.getName(), proxy_.password);
}
InvokeHTTP::~InvokeHTTP() = default;
@@ -307,6 +315,8 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
client.setDisablePeerVerification();
}
+ client.setHTTPProxy(proxy_);
+
if (emitFlowFile(method_)) {
logger_->log_trace("InvokeHTTP -- reading flowfile");
std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
@@ -321,6 +331,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
}
client.setUploadCallback(callbackObj.get());
+ client.setSeekFunction(callbackObj.get());
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index b0bb770..498ba09 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -157,6 +157,8 @@ class InvokeHTTP : public core::Processor {
bool penalize_no_retry_{false};
// disable peer verification ( makes susceptible for MITM attacks )
bool disable_peer_verification_{false};
+ utils::HTTPProxy proxy_;
+
private:
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<InvokeHTTP>::getLogger()};
};
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 826957b..1595158 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -57,6 +57,7 @@ class ByteInputCallBack : public InputStreamCallback {
}
virtual void seek(size_t pos) {
+ ptr = &vec[pos];
}
virtual void write(std::string content) {
@@ -65,7 +66,7 @@ class ByteInputCallBack : public InputStreamCallback {
}
virtual char *getBuffer(size_t pos) {
- return ptr + pos;
+ return &vec[pos];
}
virtual const size_t getRemaining(size_t pos) {
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 1069546..a03d248 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -46,7 +46,7 @@ struct HTTPProxy {
std::string host;
std::string username;
std::string password;
- int port;
+ int port = 0;
};
struct HTTPUploadCallback {
@@ -178,6 +178,8 @@ class HTTPRequestResponse {
public:
static const size_t CALLBACK_ABORT = 0x10000000;
+ static const int SEEKFUNC_OK = 0;
+ static const int SEEKFUNC_FAIL = 1;
const std::vector<char> &getData() {
return data;
@@ -250,6 +252,26 @@ class HTTPRequestResponse {
}
}
+ static int seek_callback(void *p, int64_t offset, int) {
+ try {
+ if (p == nullptr) {
+ return SEEKFUNC_FAIL;
+ }
+ HTTPUploadCallback *callback = reinterpret_cast<HTTPUploadCallback*>(p);
+ if (callback->stop) {
+ return SEEKFUNC_FAIL;
+ }
+ if (callback->ptr->getBufferSize() <= static_cast<size_t>(offset)) {
+ return SEEKFUNC_FAIL;
+ }
+ callback->pos = offset;
+ callback->ptr->seek(callback->getPos());
+ return SEEKFUNC_OK;
+ } catch (...) {
+ return SEEKFUNC_FAIL;
+ }
+ }
+
int read_data(uint8_t *buf, size_t size) {
size_t size_to_read = size;
if (size_to_read > data.size()) {
@@ -271,86 +293,57 @@ class HTTPRequestResponse {
class BaseHTTPClient {
public:
- explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) : response_code(-1) {
- }
-
- BaseHTTPClient() : response_code(-1) {
- }
+ BaseHTTPClient() = default;
virtual ~BaseHTTPClient() = default;
- virtual void setVerbose(bool use_stderr = false) {
- }
+ virtual void setVerbose(bool use_stderr = false) = 0;
- virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
- }
+ virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) = 0;
- DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setConnectionTimeout(int64_t timeout) {
- }
+ DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setConnectionTimeout(int64_t timeout) = 0;
- DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setReadTimeout(int64_t timeout) {
- }
+ DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setReadTimeout(int64_t timeout) = 0;
- virtual void setConnectionTimeout(std::chrono::milliseconds timeout) {
- }
+ virtual void setConnectionTimeout(std::chrono::milliseconds timeout) = 0;
- virtual void setReadTimeout(std::chrono::milliseconds timeout) {
- }
+ virtual void setReadTimeout(std::chrono::milliseconds timeout) = 0;
- virtual void setUploadCallback(HTTPUploadCallback *callbackObj) {
- }
+ virtual void setUploadCallback(HTTPUploadCallback *callbackObj) = 0;
- virtual void setContentType(std::string content_type) {
- }
+ virtual void setSeekFunction(HTTPUploadCallback *callbackObj) = 0;
- virtual std::string escape(std::string string_to_escape) {
- return "";
- }
+ virtual void setContentType(std::string content_type) = 0;
- virtual void setPostFields(const std::string& input) {
- }
+ virtual std::string escape(std::string string_to_escape) = 0;
- virtual bool submit() {
- return false;
- }
+ virtual void setPostFields(const std::string& input) = 0;
- virtual int64_t getResponseCode() const {
- return response_code;
- }
+ virtual bool submit() = 0;
- virtual const char *getContentType() {
- return "";
- }
+ virtual int64_t getResponseCode() const = 0;
+
+ virtual const char *getContentType() = 0;
virtual const std::vector<char> &getResponseBody() {
return response_body_;
}
- virtual void appendHeader(const std::string &new_header) {
- }
+ virtual void appendHeader(const std::string &new_header) = 0;
- virtual void set_request_method(const std::string method) {
- }
+ virtual void set_request_method(const std::string method) = 0;
- virtual void setUseChunkedEncoding() {
- }
+ virtual void setUseChunkedEncoding() = 0;
- virtual void setDisablePeerVerification() {
- }
+ virtual void setDisablePeerVerification() = 0;
- virtual void setHTTPProxy(const utils::HTTPProxy &proxy) {
- }
+ virtual void setHTTPProxy(const utils::HTTPProxy &proxy) = 0;
- virtual void setDisableHostVerification() {
- }
+ virtual void setDisableHostVerification() = 0;
- virtual bool setSpecificSSLVersion(SSLVersion specific_version) {
- return false;
- }
+ virtual bool setSpecificSSLVersion(SSLVersion specific_version) = 0;
- virtual bool setMinimumSSLVersion(SSLVersion minimum_version) {
- return false;
- }
+ virtual bool setMinimumSSLVersion(SSLVersion minimum_version) = 0;
virtual const std::vector<std::string> &getHeaders() {
return headers_;
@@ -361,14 +354,11 @@ class BaseHTTPClient {
}
protected:
- int64_t response_code;
std::vector<char> response_body_;
std::vector<std::string> headers_;
std::map<std::string, std::string> header_mapping_;
- virtual inline bool matches(const std::string &value, const std::string &sregex) {
- return false;
- }
+ virtual inline bool matches(const std::string &value, const std::string &sregex) = 0;
};
extern std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password);