You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/18 11:20:24 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1217 - RPG
should configure http client with reasonable timeouts
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 8c4392d MINIFICPP-1217 - RPG should configure http client with reasonable timeouts
8c4392d is described below
commit 8c4392db987dd993c5b87f2c2a50227df79984e5
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Wed May 13 11:07:57 2020 +0200
MINIFICPP-1217 - RPG should configure http client with reasonable timeouts
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #782
---
extensions/http-curl/client/HTTPClient.cpp | 28 ++-
extensions/http-curl/client/HTTPClient.h | 15 ++
extensions/http-curl/sitetosite/HTTPProtocol.h | 1 +
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/HTTPHandlers.h | 37 +++-
extensions/http-curl/tests/HTTPSiteToSiteTests.cpp | 24 ++-
extensions/http-curl/tests/HTTPUtils.h | 52 +++++
extensions/http-curl/tests/TestServer.h | 86 +-------
.../http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp | 216 +++++++++++++++++++++
.../http-curl/tests/VerifyInvokeHTTPTest.cpp | 4 +-
libminifi/include/RemoteProcessorGroupPort.h | 3 +
libminifi/include/sitetosite/SiteToSite.h | 10 +
libminifi/include/sitetosite/SiteToSiteClient.h | 10 +
libminifi/include/sitetosite/SiteToSiteFactory.h | 1 +
libminifi/src/RemoteProcessorGroupPort.cpp | 24 ++-
libminifi/test/resources/TestHTTPSiteToSite.yml | 2 +-
libminifi/test/resources/TestInvokeHTTPPost.yml | 2 +-
.../test/resources/TestInvokeHTTPPostSecure.yml | 2 +-
.../test/resources/TestTimeoutHTTPSiteToSite.yml | 60 ++++++
19 files changed, 469 insertions(+), 109 deletions(-)
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index e35a21f..a975feb 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -249,7 +249,12 @@ bool HTTPClient::submit() {
}
if (read_timeout_ms_.count() > 0) {
- curl_easy_setopt(http_session_, CURLOPT_TIMEOUT_MS, read_timeout_ms_.count());
+ progress_.reset();
+ curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 0);
+ curl_easy_setopt(http_session_, CURLOPT_XFERINFOFUNCTION, onProgress);
+ curl_easy_setopt(http_session_, CURLOPT_XFERINFODATA, (void*)this);
+ }else{
+ curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 1);
}
if (headers_ != nullptr) {
headers_ = curl_slist_append(headers_, "Expect:");
@@ -330,6 +335,27 @@ void HTTPClient::set_request_method(const std::string method) {
}
}
+int HTTPClient::onProgress(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow){
+ HTTPClient& client = *(HTTPClient*)(clientp);
+ auto now = std::chrono::steady_clock::now();
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - client.progress_.last_transferred_);
+ if(dlnow != client.progress_.downloaded_data_ || ulnow != client.progress_.uploaded_data_){
+ // did transfer data
+ client.progress_.last_transferred_ = now;
+ client.progress_.downloaded_data_ = dlnow;
+ client.progress_.uploaded_data_ = ulnow;
+ return 0;
+ }
+ // did not transfer data
+ if(elapsed.count() > client.read_timeout_ms_.count()){
+ // timeout
+ client.logger_->log_error("HTTP operation has been idle for %dms, limit (%dms) reached, terminating connection\n",
+ (int)elapsed.count(), (int)client.read_timeout_ms_.count());
+ return 1;
+ }
+ return 0;
+}
+
bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
if (sregex == ".*")
return true;
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index e6e8fab..8e069e5 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -222,6 +222,21 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
}
}
}
+ private:
+ static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
+
+ struct Progress{
+ std::chrono::steady_clock::time_point last_transferred_;
+ curl_off_t uploaded_data_;
+ curl_off_t downloaded_data_;
+ void reset(){
+ last_transferred_ = std::chrono::steady_clock::now();
+ uploaded_data_ = 0;
+ downloaded_data_ = 0;
+ }
+ };
+
+ Progress progress_;
protected:
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 25dc180..d25814f 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -175,6 +175,7 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient {
logger_->log_info("HTTP Site2Site setup http proxy host %s", this->peer_->getHTTPProxy().host);
http_client_->setHTTPProxy(this->peer_->getHTTPProxy());
}
+ http_client_->setReadTimeout(idle_timeout_);
return http_client_;
}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index d619a75..e9f5d3d 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -89,6 +89,7 @@ endif()
add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" )
add_test(NAME HTTPSiteToSiteTests COMMAND HTTPSiteToSiteTests "${TEST_RESOURCES}/TestHTTPSiteToSite.yml" "${TEST_RESOURCES}/" "http://localhost:8099/nifi-api")
+add_test(NAME TimeoutHTTPSiteToSiteTests COMMAND TimeoutHTTPSiteToSiteTests "${TEST_RESOURCES}/TestTimeoutHTTPSiteToSite.yml" "${TEST_RESOURCES}/" "http://localhost:8098/nifi-api")
add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 5abbc60..0a36387 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -26,6 +26,7 @@
#include "rapidjson/document.h"
#include <cinttypes>
#include <utility>
+#include "HTTPUtils.h"
static std::atomic<int> transaction_id;
static std::atomic<int> transaction_id_output;
@@ -74,8 +75,9 @@ class SiteToSiteLocationResponder : public CivetHandler {
class PeerResponder : public CivetHandler {
public:
- explicit PeerResponder(std::string base_url)
- : base_url(std::move(base_url)) {
+ explicit PeerResponder(std::string base_url) {
+ std::string scheme;
+ assert(parse_http_components(base_url, port, scheme, path));
}
bool handleGet(CivetServer *server, struct mg_connection *conn) {
@@ -85,7 +87,7 @@ class PeerResponder : public CivetHandler {
#else
std::string hostname = "localhost";
#endif
- std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": 8099, \"secure\": false, \"flowFileCount\" : 0 }] }";
+ std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": " + port + ", \"secure\": false, \"flowFileCount\" : 0 }] }";
std::stringstream headers;
headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
mg_printf(conn, "%s", headers.str().c_str());
@@ -95,6 +97,8 @@ class PeerResponder : public CivetHandler {
protected:
std::string base_url;
+ std::string port;
+ std::string path;
};
class SiteToSiteBaseResponder : public CivetHandler {
@@ -121,7 +125,7 @@ class SiteToSiteBaseResponder : public CivetHandler {
class TransactionResponder : public CivetHandler {
public:
- explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
+ explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri = false, bool empty_transaction_uri = false)
: base_url(std::move(base_url)),
wrong_uri(wrong_uri),
empty_transaction_uri(empty_transaction_uri),
@@ -183,7 +187,7 @@ class TransactionResponder : public CivetHandler {
class FlowFileResponder : public CivetHandler {
public:
- explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
+ explicit FlowFileResponder(bool input_port, bool wrong_uri = false, bool invalid_checksum = false)
: wrong_uri(wrong_uri),
input_port(input_port),
invalid_checksum(invalid_checksum),
@@ -471,16 +475,31 @@ public:
}
};
-class InvokeHTTPResponseTimeoutHandler : public CivetHandler {
+class TimeoutingHTTPHandler : public CivetHandler {
public:
- InvokeHTTPResponseTimeoutHandler(std::chrono::milliseconds wait_ms)
- : wait_(wait_ms) {
- }
+ TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
+ : wait_(wait_ms) {
+ }
bool handlePost(CivetServer *, struct mg_connection *conn) {
std::this_thread::sleep_for(wait_);
mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
+ bool handleGet(CivetServer *, struct mg_connection *conn) {
+ std::this_thread::sleep_for(wait_);
+ mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ return true;
+ }
+ bool handleDelete(CivetServer *, struct mg_connection *conn) {
+ std::this_thread::sleep_for(wait_);
+ mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ return true;
+ }
+ bool handlePut(CivetServer *, struct mg_connection *conn) {
+ std::this_thread::sleep_for(wait_);
+ mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+ return true;
+ }
protected:
std::chrono::milliseconds wait_;
};
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index 39ef776..8078171 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -52,8 +52,8 @@
class SiteToSiteTestHarness : public CoapIntegrationBase {
public:
- explicit SiteToSiteTestHarness(bool isSecure)
- : CoapIntegrationBase(2000), isSecure(isSecure) {
+ explicit SiteToSiteTestHarness(bool isSecure, std::chrono::milliseconds waitTime = std::chrono::milliseconds{2000})
+ : CoapIntegrationBase(waitTime.count()), isSecure(isSecure) {
char format[] = "/tmp/ssth.XXXXXX";
dir = testController.createTempDirectory(format);
}
@@ -100,7 +100,8 @@ struct test_profile {
}
bool allFalse() const {
- return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum;
+ return !flow_url_broken && !transaction_url_broken &&
+ !empty_transaction_url && !no_delete && !invalid_checksum;
}
// tests for a broken flow file url
bool flow_url_broken;
@@ -117,9 +118,13 @@ struct test_profile {
void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) {
SiteToSiteTestHarness harness(isSecure);
+ std::string in_port = "471deef6-2a6e-4a7d-912a-81cc17e3a204";
+ std::string out_port = "471deef6-2a6e-4a7d-912a-81cc17e3a203";
+
SiteToSiteLocationResponder *responder = new SiteToSiteLocationResponder(isSecure);
- TransactionResponder *transaction_response = new TransactionResponder(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url);
+ TransactionResponder *transaction_response = new TransactionResponder(url, in_port,
+ true, profile.transaction_url_broken, profile.empty_transaction_url);
std::string transaction_id = transaction_response->getTransactionId();
@@ -134,11 +139,11 @@ void run_variance(std::string test_file_location, bool isSecure, std::string url
harness.setUrl(controller_loc, responder);
- std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
- std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+ std::string transaction_url = url + "/data-transfer/input-ports/" + in_port + "/transactions";
+ std::string action_url = url + "/site-to-site/input-ports/" + in_port + "/transactions";
- std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
- std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+ std::string transaction_output_url = url + "/data-transfer/output-ports/" + out_port + "/transactions";
+ std::string action_output_url = url + "/site-to-site/output-ports/" + out_port + "/transactions";
harness.setUrl(transaction_url, transaction_response);
@@ -154,7 +159,8 @@ void run_variance(std::string test_file_location, bool isSecure, std::string url
flowResponder->setFlowUrl(flow_url);
auto producedFlows = flowResponder->getFlows();
- TransactionResponder *transaction_response_output = new TransactionResponder(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url);
+ TransactionResponder *transaction_response_output = new TransactionResponder(url, out_port,
+ false, profile.transaction_url_broken, profile.empty_transaction_url);
std::string transaction_output_id = transaction_response_output->getTransactionId();
transaction_response_output->setFeed(producedFlows);
diff --git a/extensions/http-curl/tests/HTTPUtils.h b/extensions/http-curl/tests/HTTPUtils.h
new file mode 100644
index 0000000..f8932d7
--- /dev/null
+++ b/extensions/http-curl/tests/HTTPUtils.h
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_HTTPUTILS_H
+#define NIFI_MINIFI_CPP_HTTPUTILS_H
+
+#include "RegexUtils.h"
+
+/**
+This function, unfortunately, assumes that we're parsing http components of a local host. On windows this is problematic
+so we convert localhost to our local hostname.
+ */
+inline bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
+#ifdef WIN32
+ auto hostname = (url.find(org::apache::nifi::minifi::io::Socket::getMyHostName()) != std::string::npos ? org::apache::nifi::minifi::io::Socket::getMyHostName() : "localhost");
+ std::string regexstr = "^(http|https)://(" + hostname + ":)([0-9]+)?(/.*)$";
+#else
+ std::string regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+#endif
+
+ using namespace org::apache::nifi::minifi::utils;
+
+ auto rgx = Regex(regexstr, {Regex::Mode::ICASE});
+
+ if (rgx.match(url)) {
+ auto result = rgx.getResult();
+ if(result.size() >= 5){
+ scheme = result[1];
+ port = result[3];
+ path = result[4];
+ return true;
+ }
+ }
+ return false;
+}
+
+#endif //NIFI_MINIFI_CPP_HTTPUTILS_H
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 2b667fc..9676630 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -17,16 +17,13 @@
*/
#ifndef LIBMINIFI_TEST_TESTSERVER_H_
#define LIBMINIFI_TEST_TESTSERVER_H_
-#if defined(_WIN32) || __cplusplus > 201103L
-#include <regex>
-#else
-#include <regex.h>
-#endif
+
#include <string>
#include <iostream>
#include "civetweb.h"
#include "CivetServer.h"
#include "civetweb.h"
+#include "HTTPUtils.h"
/* Server context handle */
@@ -80,85 +77,6 @@ CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHand
}
-/**
-This funciton, unfortunately, assumes that we're parsing http components of a local host. On windows this is problematic
-so we convert localhost to our local hostname.
- */
-bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
-
-#if (__cplusplus > 201103L) || defined(_WIN32)
-#ifdef WIN32
- auto hostname = (url.find(org::apache::nifi::minifi::io::Socket::getMyHostName()) != std::string::npos ? org::apache::nifi::minifi::io::Socket::getMyHostName() : "localhost");
- std::string regexstr = "^(http|https)://(" + hostname + ":)([0-9]+)?(/.*)$";
-#else
- std::string regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
-#endif
- std::regex rgx;
- std::regex_constants::syntax_option_type regex_mode = std::regex_constants::icase;
-
- rgx = std::regex(regexstr, regex_mode);
-
- std::smatch matches;
- std::string scratch = url;
- if (std::regex_search(scratch, matches, rgx)) {
- for (int i = 1; i < matches.size(); i++) {
- auto str = matches[i].str();
- switch (i) {
- case 1:
- scheme = str;
- break;
- case 3:
- port = str;
- break;
- case 4:
- path = str;
- break;
- default:
- break;
- }
- }
- }
-#else
- const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
- regex_t regex;
-
- int ret = regcomp(®ex, regexstr, REG_EXTENDED);
- if (ret) {
- return false;
- }
-
- size_t potentialGroups = regex.re_nsub + 1;
- regmatch_t groups[potentialGroups];
- if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) {
- for (size_t i = 0; i < potentialGroups; i++) {
- if (groups[i].rm_so == -1)
- break;
-
- std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
- switch (i) {
- case 1:
- scheme = str;
- break;
- case 3:
- port = str;
- break;
- case 4:
- path = str;
- break;
- default:
- break;
- }
- }
- }
- if (path.empty() || scheme.empty() || port.empty())
- return false;
-
- regfree(®ex);
-#endif
- return true;
-
-}
-
static void stop_webserver(CivetServer *server) {
if (server != nullptr)
delete server;
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..41619a8
--- /dev/null
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public CoapIntegrationBase {
+public:
+ explicit SiteToSiteTestHarness(bool isSecure, std::chrono::milliseconds waitTime = std::chrono::milliseconds{1000})
+ : CoapIntegrationBase(waitTime.count()), isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
+ LogTestController::getInstance().setTrace<minifi::sitetosite::HttpSiteToSiteClient>();
+ LogTestController::getInstance().setTrace<minifi::sitetosite::SiteToSiteClient>();
+ LogTestController::getInstance().setTrace<utils::HTTPClient>();
+ LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+ LogTestController::getInstance().setTrace<utils::HttpStreamingCallback>();
+
+ std::fstream file;
+ ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ configuration->set("nifi.c2.enable", "false");
+ configuration->set("nifi.remote.input.http.enabled", "true");
+ configuration->set("nifi.remote.input.socket.port", "8099");
+ }
+
+ void cleanup() override {}
+
+ void runAssertions() override {}
+
+protected:
+ bool isSecure;
+ std::string dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+struct defaulted_handler{
+ CivetHandler* handler = nullptr;
+ CivetHandler* get(CivetHandler *def) const {
+ if(handler)return handler;
+ return def;
+ }
+ void set(std::chrono::milliseconds timeout) {
+ handler = new TimeoutingHTTPHandler(timeout);
+ }
+};
+
+/**
+ * Determines which responders will timeout
+ */
+struct timeout_test_profile{
+ defaulted_handler base_;
+ defaulted_handler transaction_;
+ defaulted_handler flow_;
+ defaulted_handler peer_;
+ defaulted_handler delete_;
+};
+
+void run_timeout_variance(std::string test_file_location, bool isSecure, std::string url, const timeout_test_profile &profile) {
+ SiteToSiteTestHarness harness(isSecure);
+
+ std::string in_port = "471deef6-2a6e-4a7d-912a-81cc17e3a204";
+
+ TransactionResponder *transaction_response = new TransactionResponder(url, in_port, true);
+
+ std::string transaction_id = transaction_response->getTransactionId();
+
+ harness.setKeyDir("");
+
+ std::string baseUrl = url + "/site-to-site";
+ SiteToSiteBaseResponder *base = new SiteToSiteBaseResponder(baseUrl);
+
+ harness.setUrl(baseUrl, profile.base_.get(base));
+
+ std::string transaction_url = url + "/data-transfer/input-ports/" + in_port + "/transactions";
+ std::string action_url = url + "/site-to-site/input-ports/" + in_port + "/transactions";
+
+ harness.setUrl(transaction_url, profile.transaction_.get(transaction_response));
+
+ std::string peer_url = url + "/site-to-site/peers";
+
+ PeerResponder *peer_response = new PeerResponder(url);
+
+ harness.setUrl(peer_url, profile.peer_.get(peer_response));
+
+ std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+ FlowFileResponder *flowResponder = new FlowFileResponder(true);
+ flowResponder->setFlowUrl(flow_url);
+
+ harness.setUrl(flow_url, profile.flow_.get(flowResponder));
+
+ std::string delete_url = transaction_url + "/" + transaction_id;
+ DeleteTransactionResponder *deleteResponse = new DeleteTransactionResponder(delete_url, "201 OK", 12);
+ harness.setUrl(delete_url, profile.delete_.get(deleteResponse));
+
+ harness.run(test_file_location);
+
+ assert(LogTestController::getInstance().contains("limit (200ms) reached, terminating connection") == true);
+
+ LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+ transaction_id = 0;
+ transaction_id_output = 0;
+ std::string key_dir, test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ key_dir = argv[2];
+ url = argv[3];
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+#ifdef WIN32
+ if (url.find("localhost") != std::string::npos) {
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ url = scheme + "://" + org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + port + path;
+ }
+#endif
+
+ const auto timeout = std::chrono::milliseconds{500};
+
+ {
+ timeout_test_profile profile;
+ profile.base_.set(timeout);
+ run_timeout_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ timeout_test_profile profile;
+ profile.flow_.set(timeout);
+ run_timeout_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ timeout_test_profile profile;
+ profile.transaction_.set(timeout);
+ run_timeout_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ timeout_test_profile profile;
+ profile.delete_.set(timeout);
+ run_timeout_variance(test_file_location, isSecure, url, profile);
+ }
+
+ {
+ timeout_test_profile profile;
+ profile.peer_.set(timeout);
+ run_timeout_variance(test_file_location, isSecure, url, profile);
+ }
+
+ return 0;
+}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 66c8525..bbc1ce2 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -133,7 +133,7 @@ class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
public:
virtual void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
- assert(LogTestController::getInstance().contains("failed Timeout was reached"));
+ assert(LogTestController::getInstance().contains("limit (1000ms) reached, terminating connection"));
}
};
@@ -192,7 +192,7 @@ int main(int argc, char ** argv) {
}
{
- InvokeHTTPResponseTimeoutHandler handler(std::chrono::milliseconds(4000));
+ TimeoutingHTTPHandler handler(std::chrono::milliseconds(4000));
VerifyRWTimeoutInvokeHTTP harness;
run(harness, url, test_file_location, key_dir, &handler);
}
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index b28ee1f..8978624 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -108,6 +108,7 @@ class RemoteProcessorGroupPort : public core::Processor {
static core::Property SSLContext;
static core::Property port;
static core::Property portUUID;
+ static core::Property idleTimeout;
// Supported Relationships
static core::Relationship relation;
public:
@@ -211,6 +212,8 @@ class RemoteProcessorGroupPort : public core::Processor {
utils::Identifier protocol_uuid_;
+ std::chrono::milliseconds idle_timeout_{};
+
// rest API end point info
std::vector<struct RPG> nifi_instances_;
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 6ba2dbb..fda87b0 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -361,6 +361,14 @@ class SiteToSiteClientConfiguration {
return stream_factory_;
}
+ void setIdleTimeout(std::chrono::milliseconds timeout) {
+ idle_timeout_ = timeout;
+ }
+
+ std::chrono::milliseconds getIdleTimeout() const {
+ return idle_timeout_;
+ }
+
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
@@ -385,6 +393,8 @@ class SiteToSiteClientConfiguration {
std::string local_network_interface_;
+ std::chrono::milliseconds idle_timeout_{};
+
// secore comms
std::shared_ptr<controllers::SSLContextService> ssl_service_;
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index e1fffc3..132bb99 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -151,6 +151,13 @@ class SiteToSiteClient : public core::Connectable {
}
/**
+ * Sets the idle timeout.
+ */
+ void setIdleTimeout(std::chrono::milliseconds timeout) {
+ idle_timeout_ = timeout;
+ }
+
+ /**
* Sets the base peer for this interface.
*/
virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
@@ -246,6 +253,9 @@ class SiteToSiteClient : public core::Connectable {
// portId
utils::Identifier port_id_;
+ // idleTimeout
+ std::chrono::milliseconds idle_timeout_{};
+
// Peer Connection
std::unique_ptr<SiteToSitePeer> peer_;
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 2d1dd74..29d49e1 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -93,6 +93,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
ptr->setPortId(uuid);
ptr->setPeer(std::move(peer));
+ ptr->setIdleTimeout(client_configuration.getIdleTimeout());
return ptr;
}
return nullptr;
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index f110e89..8d2c5ce 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -58,6 +58,9 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name
core::Property RemoteProcessorGroupPort::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "");
core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", "");
+core::Property RemoteProcessorGroupPort::idleTimeout(
+ core::PropertyBuilder::createProperty("Idle Timeout")->withDescription("Max idle time for remote service")->isRequired(false)
+ ->withDefaultValue<core::TimePeriodValue>("15 s")->build());
core::Relationship RemoteProcessorGroupPort::relation;
std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextProtocol(bool create = true) {
@@ -77,6 +80,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host, rpg.port_, ssl_service != nullptr), this->getInterface(),
client_type_);
config.setHTTPProxy(this->proxy_);
+ config.setIdleTimeout(idle_timeout_);
nextProtocol = sitetosite::createClient(config);
}
} else if (peer_index_ >= 0) {
@@ -89,6 +93,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
peer_index_ = 0;
}
config.setHTTPProxy(this->proxy_);
+ config.setIdleTimeout(idle_timeout_);
nextProtocol = sitetosite::createClient(config);
} else {
logger_->log_debug("Refreshing the peer list since there are none configured.");
@@ -120,6 +125,7 @@ void RemoteProcessorGroupPort::initialize() {
properties.insert(port);
properties.insert(SSLContext);
properties.insert(portUUID);
+ properties.insert(idleTimeout);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -154,11 +160,23 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
} else {
std::string secureStr;
bool is_secure = false;
- if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) {
+ if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) {
ssl_service = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_);
ssl_service->onEnable();
}
}
+ {
+ uint64_t idleTimeoutVal;
+ std::string idleTimeoutStr;
+ if (!context->getProperty(idleTimeout.getName(), idleTimeoutStr)
+ || !core::Property::getTimeMSFromString(idleTimeoutStr, idleTimeoutVal)) {
+ logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(),
+ idleTimeout.getValue());
+ assert(core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal));
+ }
+ idle_timeout_ = std::chrono::milliseconds(idleTimeoutVal);
+ }
std::lock_guard<std::mutex> lock(peer_mutex_);
if (!nifi_instances_.empty()) {
@@ -200,6 +218,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
}
logger_->log_trace("Creating client");
config.setHTTPProxy(this->proxy_);
+ config.setIdleTimeout(idle_timeout_);
nextProtocol = sitetosite::createClient(config);
logger_->log_trace("Created client, moving into available protocols");
returnProtocol(std::move(nextProtocol));
@@ -306,6 +325,7 @@ std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(std::chrono::milliseconds(10000));
+ client->setReadTimeout(idle_timeout_);
token = utils::get_token(client.get(), this->rest_user_name_, this->rest_password_);
logger_->log_debug("Token from NiFi REST Api endpoint %s, %s", loginUrl.str(), token);
@@ -324,6 +344,7 @@ std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(std::chrono::milliseconds(10000));
+ client->setReadTimeout(idle_timeout_);
if (!proxy_.host.empty()) {
client->setHTTPProxy(proxy_);
}
@@ -384,6 +405,7 @@ void RemoteProcessorGroupPort::refreshPeerList() {
this->getInterface(), client_type_);
config.setSecurityContext(ssl_service);
config.setHTTPProxy(this->proxy_);
+ config.setIdleTimeout(idle_timeout_);
protocol = sitetosite::createClient(config);
if (protocol)
diff --git a/libminifi/test/resources/TestHTTPSiteToSite.yml b/libminifi/test/resources/TestHTTPSiteToSite.yml
index 2bdee30..339d8c6 100644
--- a/libminifi/test/resources/TestHTTPSiteToSite.yml
+++ b/libminifi/test/resources/TestHTTPSiteToSite.yml
@@ -23,7 +23,7 @@ Processors:
class: org.apache.nifi.processors.standard.GenerateFlowFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
- scheduling period: 1 sec
+ scheduling period: 5 sec
penalization period: 30 sec
yield period: 10 sec
run duration nanos: 0
diff --git a/libminifi/test/resources/TestInvokeHTTPPost.yml b/libminifi/test/resources/TestInvokeHTTPPost.yml
index 9450edf..cb8eb8e 100644
--- a/libminifi/test/resources/TestInvokeHTTPPost.yml
+++ b/libminifi/test/resources/TestInvokeHTTPPost.yml
@@ -75,7 +75,7 @@ Processors:
Disable Peer Verification: 'false'
HTTP Method: POST
Include Date Header: 'true'
- Read Timeout: 4 s
+ Read Timeout: 1 s
Remote URL: http://localhost:0/minifi
Use Chunked Encoding: 'false'
send-message-body: 'true'
diff --git a/libminifi/test/resources/TestInvokeHTTPPostSecure.yml b/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
index 2fbcd1b..6e9bd9a 100644
--- a/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
+++ b/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
@@ -74,7 +74,7 @@ Processors:
Content-type: application/octet-stream
Disable Peer Verification: 'true'
HTTP Method: POST
- Read Timeout: 3 s
+ Read Timeout: 1 s
Remote URL: https://localhost:0/minifi
Use Chunked Encoding: 'false'
send-message-body: 'true'
diff --git a/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml b/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml
new file mode 100644
index 0000000..d49314f
--- /dev/null
+++ b/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+ name: MiNiFi Flow
+
+Processors:
+ - name: GetFile
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 2 sec
+ penalization period: 30 sec
+ yield period: 10 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+
+
+Connections:
+ - name: GenerateFlowFileS2S
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+ source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+ source relationship name: success
+ destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+ - name: NiFi Flow
+ id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+ url: http://localhost:8098/nifi
+ timeout: 30 secs
+ yield period: 1 sec
+ transport protocol: HTTP
+ Input Ports:
+ - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ name: From Node A
+ max concurrent tasks: 1
+ use compression: false
+ Properties: # Deviates from spec and will later be removed when this is autonegotiated
+ Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+ Port: 8082
+ Host Name: 127.0.0.1
+ Idle Timeout: 200 ms
\ No newline at end of file