You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2020/09/07 17:34:40 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1357 Implement and test proxy handling in InvokeHTTP

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8318305  MINIFICPP-1357 Implement and test proxy handling in InvokeHTTP
8318305 is described below

commit 831830516b63737076a036b422232f68e33335d7
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Sep 7 19:33:22 2020 +0200

    MINIFICPP-1357 Implement and test proxy handling in InvokeHTTP
    
    Signed-off-by: Marton Szasz <sz...@gmail.com>
    
    This closes #897
---
 docker/test/integration/minifi/__init__.py      |  35 +++++++-
 docker/test/integration/minifi/test/__init__.py |  11 +++
 docker/test/integration/test_http.py            |  25 +++++-
 extensions/http-curl/client/HTTPClient.cpp      |   5 ++
 extensions/http-curl/client/HTTPClient.h        |   2 +
 extensions/http-curl/processors/InvokeHTTP.cpp  |  15 +++-
 extensions/http-curl/processors/InvokeHTTP.h    |   2 +
 libminifi/include/utils/ByteArrayCallback.h     |   3 +-
 libminifi/include/utils/HTTPClient.h            | 106 +++++++++++-------------
 9 files changed, 141 insertions(+), 63 deletions(-)

diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index cfb2eb5..3e55d5b 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -112,6 +112,8 @@ class SingleNodeDockerCluster(Cluster):
             self.deploy_minifi_cpp_flow(flow, name, vols)
         elif engine == 'kafka-broker':
             self.deploy_kafka_broker(name)
+        elif engine == 'http-proxy':
+            self.deploy_http_proxy()
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
@@ -241,6 +243,28 @@ class SingleNodeDockerCluster(Cluster):
         self.containers[consumer.name] = consumer
         self.containers[broker.name] = broker
 
+    def deploy_http_proxy(self):
+        logging.info('Creating and running http-proxy docker container...')
+        dockerfile = dedent("""FROM {base_image}
+                RUN apt update && apt install -y apache2-utils
+                RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
+                RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users'  > /etc/squid/squid.conf && \
+                    echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \
+                    echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \
+                    echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \
+                    echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
+                ENTRYPOINT ["/sbin/entrypoint.sh"]
+                """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
+        configured_image = self.build_image(dockerfile, [])
+        consumer = self.client.containers.run(
+                    configured_image[0],
+                    detach=True,
+                    name='http-proxy',
+                    network=self.network.name,
+                    ports={'3128/tcp': 3128},
+                    )
+        self.containers[consumer.name] = consumer
+
     def build_image(self, dockerfile, context_files):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
@@ -432,8 +456,17 @@ class Processor(Connectable):
 class InvokeHTTP(Processor):
     def __init__(self, url,
                  method='GET',
+                 proxy_host='',
+                 proxy_port='',
+                 proxy_username='',
+                 proxy_password='',
                  ssl_context_service=None):
-        properties = {'Remote URL': url, 'HTTP Method': method}
+        properties = {'Remote URL': url,
+                      'HTTP Method': method,
+                      'Proxy Host': proxy_host,
+                      'Proxy Port': proxy_port,
+                      'invokehttp-proxy-username': proxy_username,
+                      'invokehttp-proxy-password': proxy_password}
 
         controller_services = []
 
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 71fd096..2a4218f 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -17,6 +17,8 @@ import logging
 import shutil
 import uuid
 import tarfile
+import subprocess
+import sys
 from io import BytesIO
 from threading import Event
 
@@ -177,6 +179,15 @@ class DockerTestCluster(SingleNodeDockerCluster):
         if isinstance(self.output_validator, FileOutputValidator):
             return self.output_validator.validate(dir=kwargs.get('dir', ''))
         return self.output_validator.validate()
+
+    def check_http_proxy_access(self):
+        output = subprocess.check_output(["docker", "exec", "http-proxy", "cat", "/var/log/squid/access.log"]).decode(sys.stdout.encoding)
+        if "http://minifi-listen:8080/contentListener" in output and \
+           output.count("TCP_DENIED/407") != 0 and \
+           output.count("TCP_MISS/200") == output.count("TCP_DENIED/407"):
+            return True
+        return False
+
     def rm_out_child(self, dir):
         logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
         shutil.rmtree(self.tmp_test_output_dir + dir)
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
index 6b6a9f5..faa4a5a 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -21,7 +21,6 @@ def test_invoke_listen():
     """
     Verify sending using InvokeHTTP to a receiver using ListenHTTP.
     """
-
     invoke_flow = (GetFile('/tmp/input')
                    >> LogAttribute()
                    >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
@@ -34,3 +33,27 @@ def test_invoke_listen():
         cluster.deploy_flow(invoke_flow, name='minifi-invoke')
 
         assert cluster.check_output()
+
+def test_invoke_listen_with_proxy():
+    """
+    Verify sending through a proxy using InvokeHTTP to a receiver using ListenHTTP.
+    """
+    invoke_flow = (GetFile('/tmp/input')
+                   >> LogAttribute()
+                   >> InvokeHTTP('http://minifi-listen:8080/contentListener',
+                                 method='POST',
+                                 proxy_host='http-proxy',
+                                 proxy_port='3128',
+                                 proxy_username='admin',
+                                 proxy_password='test101'))
+
+    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+        cluster.put_test_data('test')
+        cluster.deploy_flow(None, engine='http-proxy')
+        cluster.deploy_flow(listen_flow, name='minifi-listen')
+        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
+
+        assert cluster.check_output()
+        assert cluster.check_http_proxy_access()
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 9c36693..5d48a9a 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -190,6 +190,11 @@ void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
   curl_easy_setopt(http_session_, CURLOPT_READDATA, static_cast<void*>(callbackObj));
 }
 
+void HTTPClient::setSeekFunction(HTTPUploadCallback *callbackObj) {
+  curl_easy_setopt(http_session_, CURLOPT_SEEKDATA, static_cast<void*>(callbackObj));
+  curl_easy_setopt(http_session_, CURLOPT_SEEKFUNCTION, &utils::HTTPRequestResponse::seek_callback);
+}
+
 struct curl_slist *HTTPClient::build_header_list(std::string regex, const std::map<std::string, std::string> &attributes) {
   if (http_session_) {
     for (auto attribute : attributes) {
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 3b1941a..1add721 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -106,6 +106,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   void setUploadCallback(HTTPUploadCallback *callbackObj) override;
 
+  void setSeekFunction(HTTPUploadCallback *callbackObj) override;
+
   virtual void setReadCallback(HTTPReadCallback *callbackObj);
 
   struct curl_slist *build_header_list(std::string regex, const std::map<std::string, std::string> &attributes);
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 70a4ed2..4dc4744 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -83,7 +83,9 @@ core::Property InvokeHTTP::SSLContext(
                                                                                   "information for TLS/SSL (https) connections.")->isRequired(false)->withExclusiveProperty("Remote URL", "^http:.*$")
         ->asType<minifi::controllers::SSLContextService>()->build());
 core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", "");
-core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", "");
+core::Property InvokeHTTP::ProxyPort(
+    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
+        ->isRequired(false)->build());
 core::Property InvokeHTTP::ProxyUsername(
     core::PropertyBuilder::createProperty("invokehttp-proxy-username", "Proxy Username")->withDescription("Username to set when authenticating against proxy")->isRequired(false)->build());
 core::Property InvokeHTTP::ProxyPassword(
@@ -145,8 +147,8 @@ void InvokeHTTP::initialize() {
   properties.insert(ProxyHost);
   properties.insert(ProxyPort);
   properties.insert(ProxyUsername);
-  properties.insert(UseChunkedEncoding);
   properties.insert(ProxyPassword);
+  properties.insert(UseChunkedEncoding);
   properties.insert(ContentType);
   properties.insert(SendBody);
   properties.insert(DisablePeerVerification);
@@ -246,6 +248,12 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
   if (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification)) {
     utils::StringUtils::StringToBool(disablePeerVerification, disable_peer_verification_);
   }
+
+  proxy_ = {};
+  context->getProperty(ProxyHost.getName(), proxy_.host);
+  context->getProperty(ProxyPort.getName(), proxy_.port);
+  context->getProperty(ProxyUsername.getName(), proxy_.username);
+  context->getProperty(ProxyPassword.getName(), proxy_.password);
 }
 
 InvokeHTTP::~InvokeHTTP() = default;
@@ -307,6 +315,8 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
     client.setDisablePeerVerification();
   }
 
+  client.setHTTPProxy(proxy_);
+
   if (emitFlowFile(method_)) {
     logger_->log_trace("InvokeHTTP -- reading flowfile");
     std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
@@ -321,6 +331,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
         client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
       }
       client.setUploadCallback(callbackObj.get());
+      client.setSeekFunction(callbackObj.get());
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index b0bb770..498ba09 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -157,6 +157,8 @@ class InvokeHTTP : public core::Processor {
   bool penalize_no_retry_{false};
   // disable peer verification ( makes susceptible for MITM attacks )
   bool disable_peer_verification_{false};
+  utils::HTTPProxy proxy_;
+
  private:
   std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<InvokeHTTP>::getLogger()};
 };
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 826957b..1595158 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -57,6 +57,7 @@ class ByteInputCallBack : public InputStreamCallback {
   }
 
   virtual void seek(size_t pos) {
+    ptr = &vec[pos];
   }
 
   virtual void write(std::string content) {
@@ -65,7 +66,7 @@ class ByteInputCallBack : public InputStreamCallback {
   }
 
   virtual char *getBuffer(size_t pos) {
-    return ptr + pos;
+    return &vec[pos];
   }
 
   virtual const size_t getRemaining(size_t pos) {
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 1069546..a03d248 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -46,7 +46,7 @@ struct HTTPProxy {
   std::string host;
   std::string username;
   std::string password;
-  int port;
+  int port = 0;
 };
 
 struct HTTPUploadCallback {
@@ -178,6 +178,8 @@ class HTTPRequestResponse {
 
  public:
   static const size_t CALLBACK_ABORT = 0x10000000;
+  static const int SEEKFUNC_OK = 0;
+  static const int SEEKFUNC_FAIL = 1;
 
   const std::vector<char> &getData() {
     return data;
@@ -250,6 +252,26 @@ class HTTPRequestResponse {
     }
   }
 
+  static int seek_callback(void *p, int64_t offset, int) {
+    try {
+      if (p == nullptr) {
+        return SEEKFUNC_FAIL;
+      }
+      HTTPUploadCallback *callback = reinterpret_cast<HTTPUploadCallback*>(p);
+      if (callback->stop) {
+        return SEEKFUNC_FAIL;
+      }
+      if (callback->ptr->getBufferSize() <= static_cast<size_t>(offset)) {
+        return SEEKFUNC_FAIL;
+      }
+      callback->pos = offset;
+      callback->ptr->seek(callback->getPos());
+      return SEEKFUNC_OK;
+    } catch (...) {
+      return SEEKFUNC_FAIL;
+    }
+  }
+
   int read_data(uint8_t *buf, size_t size) {
     size_t size_to_read = size;
     if (size_to_read > data.size()) {
@@ -271,86 +293,57 @@ class HTTPRequestResponse {
 
 class BaseHTTPClient {
  public:
-  explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) : response_code(-1) {
-  }
-
-  BaseHTTPClient() : response_code(-1) {
-  }
+  BaseHTTPClient() = default;
 
   virtual ~BaseHTTPClient() = default;
 
-  virtual void setVerbose(bool use_stderr = false) {
-  }
+  virtual void setVerbose(bool use_stderr = false) = 0;
 
-  virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
-  }
+  virtual void initialize(const std::string &method, const std::string url = "", const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) = 0;
 
-  DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setConnectionTimeout(int64_t timeout) {
-  }
+  DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setConnectionTimeout(int64_t timeout) = 0;
 
-  DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setReadTimeout(int64_t timeout) {
-  }
+  DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) virtual void setReadTimeout(int64_t timeout) = 0;
 
-  virtual void setConnectionTimeout(std::chrono::milliseconds timeout) {
-  }
+  virtual void setConnectionTimeout(std::chrono::milliseconds timeout) = 0;
 
-  virtual void setReadTimeout(std::chrono::milliseconds timeout) {
-  }
+  virtual void setReadTimeout(std::chrono::milliseconds timeout) = 0;
 
-  virtual void setUploadCallback(HTTPUploadCallback *callbackObj) {
-  }
+  virtual void setUploadCallback(HTTPUploadCallback *callbackObj) = 0;
 
-  virtual void setContentType(std::string content_type) {
-  }
+  virtual void setSeekFunction(HTTPUploadCallback *callbackObj) = 0;
 
-  virtual std::string escape(std::string string_to_escape) {
-    return "";
-  }
+  virtual void setContentType(std::string content_type) = 0;
 
-  virtual void setPostFields(const std::string& input) {
-  }
+  virtual std::string escape(std::string string_to_escape) = 0;
 
-  virtual bool submit() {
-    return false;
-  }
+  virtual void setPostFields(const std::string& input) = 0;
 
-  virtual int64_t getResponseCode() const {
-    return response_code;
-  }
+  virtual bool submit() = 0;
 
-  virtual const char *getContentType() {
-    return "";
-  }
+  virtual int64_t getResponseCode() const = 0;
+
+  virtual const char *getContentType() = 0;
 
   virtual const std::vector<char> &getResponseBody() {
     return response_body_;
   }
 
-  virtual void appendHeader(const std::string &new_header) {
-  }
+  virtual void appendHeader(const std::string &new_header) = 0;
 
-  virtual void set_request_method(const std::string method) {
-  }
+  virtual void set_request_method(const std::string method) = 0;
 
-  virtual void setUseChunkedEncoding() {
-  }
+  virtual void setUseChunkedEncoding() = 0;
 
-  virtual void setDisablePeerVerification() {
-  }
+  virtual void setDisablePeerVerification() = 0;
 
-  virtual void setHTTPProxy(const utils::HTTPProxy &proxy) {
-  }
+  virtual void setHTTPProxy(const utils::HTTPProxy &proxy) = 0;
 
-  virtual void setDisableHostVerification() {
-  }
+  virtual void setDisableHostVerification() = 0;
 
-  virtual bool setSpecificSSLVersion(SSLVersion specific_version) {
-    return false;
-  }
+  virtual bool setSpecificSSLVersion(SSLVersion specific_version) = 0;
 
-  virtual bool setMinimumSSLVersion(SSLVersion minimum_version) {
-    return false;
-  }
+  virtual bool setMinimumSSLVersion(SSLVersion minimum_version) = 0;
 
   virtual const std::vector<std::string> &getHeaders() {
     return headers_;
@@ -361,14 +354,11 @@ class BaseHTTPClient {
   }
 
  protected:
-  int64_t response_code;
   std::vector<char> response_body_;
   std::vector<std::string> headers_;
   std::map<std::string, std::string> header_mapping_;
 
-  virtual inline bool matches(const std::string &value, const std::string &sregex) {
-    return false;
-  }
+  virtual inline bool matches(const std::string &value, const std::string &sregex) = 0;
 };
 
 extern std::string get_token(utils::BaseHTTPClient *client, std::string username, std::string password);