You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/05/18 11:20:24 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1217 - RPG should configure http client with reasonable timeouts

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4392d  MINIFICPP-1217 - RPG should configure http client with reasonable timeouts
8c4392d is described below

commit 8c4392db987dd993c5b87f2c2a50227df79984e5
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Wed May 13 11:07:57 2020 +0200

    MINIFICPP-1217 - RPG should configure http client with reasonable timeouts
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #782
---
 extensions/http-curl/client/HTTPClient.cpp         |  28 ++-
 extensions/http-curl/client/HTTPClient.h           |  15 ++
 extensions/http-curl/sitetosite/HTTPProtocol.h     |   1 +
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          |  37 +++-
 extensions/http-curl/tests/HTTPSiteToSiteTests.cpp |  24 ++-
 extensions/http-curl/tests/HTTPUtils.h             |  52 +++++
 extensions/http-curl/tests/TestServer.h            |  86 +-------
 .../http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp | 216 +++++++++++++++++++++
 .../http-curl/tests/VerifyInvokeHTTPTest.cpp       |   4 +-
 libminifi/include/RemoteProcessorGroupPort.h       |   3 +
 libminifi/include/sitetosite/SiteToSite.h          |  10 +
 libminifi/include/sitetosite/SiteToSiteClient.h    |  10 +
 libminifi/include/sitetosite/SiteToSiteFactory.h   |   1 +
 libminifi/src/RemoteProcessorGroupPort.cpp         |  24 ++-
 libminifi/test/resources/TestHTTPSiteToSite.yml    |   2 +-
 libminifi/test/resources/TestInvokeHTTPPost.yml    |   2 +-
 .../test/resources/TestInvokeHTTPPostSecure.yml    |   2 +-
 .../test/resources/TestTimeoutHTTPSiteToSite.yml   |  60 ++++++
 19 files changed, 469 insertions(+), 109 deletions(-)

diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index e35a21f..a975feb 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -249,7 +249,12 @@ bool HTTPClient::submit() {
   }
 
   if (read_timeout_ms_.count() > 0) {
-    curl_easy_setopt(http_session_, CURLOPT_TIMEOUT_MS, read_timeout_ms_.count());
+    progress_.reset();
+    curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 0);
+    curl_easy_setopt(http_session_, CURLOPT_XFERINFOFUNCTION, onProgress);
+    curl_easy_setopt(http_session_, CURLOPT_XFERINFODATA, (void*)this);
+  }else{
+    curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 1);
   }
   if (headers_ != nullptr) {
     headers_ = curl_slist_append(headers_, "Expect:");
@@ -330,6 +335,27 @@ void HTTPClient::set_request_method(const std::string method) {
   }
 }
 
+int HTTPClient::onProgress(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow){
+  HTTPClient& client = *(HTTPClient*)(clientp);
+  auto now = std::chrono::steady_clock::now();
+  auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - client.progress_.last_transferred_);
+  if(dlnow != client.progress_.downloaded_data_ || ulnow != client.progress_.uploaded_data_){
+    // did transfer data
+    client.progress_.last_transferred_ = now;
+    client.progress_.downloaded_data_ = dlnow;
+    client.progress_.uploaded_data_ = ulnow;
+    return 0;
+  }
+  // did not transfer data
+  if(elapsed.count() > client.read_timeout_ms_.count()){
+    // timeout
+    client.logger_->log_error("HTTP operation has been idle for %dms, limit (%dms) reached, terminating connection\n",
+      (int)elapsed.count(), (int)client.read_timeout_ms_.count());
+    return 1;
+  }
+  return 0;
+}
+
 bool HTTPClient::matches(const std::string &value, const std::string &sregex) {
   if (sregex == ".*")
     return true;
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index e6e8fab..8e069e5 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -222,6 +222,21 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
       }
     }
   }
+ private:
+  static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
+
+  struct Progress{
+    std::chrono::steady_clock::time_point last_transferred_;
+    curl_off_t uploaded_data_;
+    curl_off_t downloaded_data_;
+    void reset(){
+      last_transferred_ = std::chrono::steady_clock::now();
+      uploaded_data_ = 0;
+      downloaded_data_ = 0;
+    }
+  };
+
+  Progress progress_;
 
  protected:
 
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 25dc180..d25814f 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -175,6 +175,7 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient {
       logger_->log_info("HTTP Site2Site setup http proxy host %s", this->peer_->getHTTPProxy().host);
       http_client_->setHTTPProxy(this->peer_->getHTTPProxy());
     }
+    http_client_->setReadTimeout(idle_timeout_);
     return http_client_;
   }
 
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index d619a75..e9f5d3d 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -89,6 +89,7 @@ endif()
 add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/")
 add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" )
 add_test(NAME HTTPSiteToSiteTests COMMAND HTTPSiteToSiteTests "${TEST_RESOURCES}/TestHTTPSiteToSite.yml" "${TEST_RESOURCES}/" "http://localhost:8099/nifi-api")
+add_test(NAME TimeoutHTTPSiteToSiteTests COMMAND TimeoutHTTPSiteToSiteTests "${TEST_RESOURCES}/TestTimeoutHTTPSiteToSite.yml" "${TEST_RESOURCES}/" "http://localhost:8098/nifi-api")
 add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site")
 add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
 add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 5abbc60..0a36387 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -26,6 +26,7 @@
 #include "rapidjson/document.h"
 #include <cinttypes>
 #include <utility>
+#include "HTTPUtils.h"
 
 static std::atomic<int> transaction_id;
 static std::atomic<int> transaction_id_output;
@@ -74,8 +75,9 @@ class SiteToSiteLocationResponder : public CivetHandler {
 class PeerResponder : public CivetHandler {
  public:
 
-  explicit PeerResponder(std::string base_url)
-      : base_url(std::move(base_url)) {
+  explicit PeerResponder(std::string base_url) {
+    std::string scheme;
+    assert(parse_http_components(base_url, port, scheme, path));
   }
 
   bool handleGet(CivetServer *server, struct mg_connection *conn) {
@@ -85,7 +87,7 @@ class PeerResponder : public CivetHandler {
 #else
 	  std::string hostname = "localhost";
 #endif
-    std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": 8099,  \"secure\": false, \"flowFileCount\" : 0 }] }";
+    std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": " + port + ",  \"secure\": false, \"flowFileCount\" : 0 }] }";
     std::stringstream headers;
     headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
     mg_printf(conn, "%s", headers.str().c_str());
@@ -95,6 +97,8 @@ class PeerResponder : public CivetHandler {
 
  protected:
   std::string base_url;
+  std::string port;
+  std::string path;
 };
 
 class SiteToSiteBaseResponder : public CivetHandler {
@@ -121,7 +125,7 @@ class SiteToSiteBaseResponder : public CivetHandler {
 class TransactionResponder : public CivetHandler {
  public:
 
-  explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
+  explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri = false, bool empty_transaction_uri = false)
       : base_url(std::move(base_url)),
         wrong_uri(wrong_uri),
         empty_transaction_uri(empty_transaction_uri),
@@ -183,7 +187,7 @@ class TransactionResponder : public CivetHandler {
 class FlowFileResponder : public CivetHandler {
  public:
 
-  explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum)
+  explicit FlowFileResponder(bool input_port, bool wrong_uri = false, bool invalid_checksum = false)
       : wrong_uri(wrong_uri),
         input_port(input_port),
         invalid_checksum(invalid_checksum),
@@ -471,16 +475,31 @@ public:
   }
 };
 
-class InvokeHTTPResponseTimeoutHandler : public CivetHandler {
+class TimeoutingHTTPHandler : public CivetHandler {
 public:
-    InvokeHTTPResponseTimeoutHandler(std::chrono::milliseconds wait_ms)
-        : wait_(wait_ms) {
-    }
+  TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
+      : wait_(wait_ms) {
+  }
   bool handlePost(CivetServer *, struct mg_connection *conn) {
     std::this_thread::sleep_for(wait_);
     mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
     return true;
   }
+  bool handleGet(CivetServer *, struct mg_connection *conn) {
+    std::this_thread::sleep_for(wait_);
+    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
+  bool handleDelete(CivetServer *, struct mg_connection *conn) {
+    std::this_thread::sleep_for(wait_);
+    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
+  bool handlePut(CivetServer *, struct mg_connection *conn) {
+    std::this_thread::sleep_for(wait_);
+    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    return true;
+  }
 protected:
   std::chrono::milliseconds wait_;
 };
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index 39ef776..8078171 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -52,8 +52,8 @@
 
 class SiteToSiteTestHarness : public CoapIntegrationBase {
  public:
-  explicit SiteToSiteTestHarness(bool isSecure)
-      : CoapIntegrationBase(2000), isSecure(isSecure) {
+  explicit SiteToSiteTestHarness(bool isSecure, std::chrono::milliseconds waitTime = std::chrono::milliseconds{2000})
+      : CoapIntegrationBase(waitTime.count()), isSecure(isSecure) {
     char format[] = "/tmp/ssth.XXXXXX";
     dir = testController.createTempDirectory(format);
   }
@@ -100,7 +100,8 @@ struct test_profile {
   }
 
   bool allFalse() const {
-    return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum;
+    return !flow_url_broken && !transaction_url_broken &&
+      !empty_transaction_url && !no_delete && !invalid_checksum;
   }
   // tests for a broken flow file url
   bool flow_url_broken;
@@ -117,9 +118,13 @@ struct test_profile {
 void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) {
   SiteToSiteTestHarness harness(isSecure);
 
+  std::string in_port = "471deef6-2a6e-4a7d-912a-81cc17e3a204";
+  std::string out_port = "471deef6-2a6e-4a7d-912a-81cc17e3a203";
+
   SiteToSiteLocationResponder *responder = new SiteToSiteLocationResponder(isSecure);
 
-  TransactionResponder *transaction_response = new TransactionResponder(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url);
+  TransactionResponder *transaction_response = new TransactionResponder(url, in_port,
+      true, profile.transaction_url_broken, profile.empty_transaction_url);
 
   std::string transaction_id = transaction_response->getTransactionId();
 
@@ -134,11 +139,11 @@ void run_variance(std::string test_file_location, bool isSecure, std::string url
 
   harness.setUrl(controller_loc, responder);
 
-  std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
-  std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions";
+  std::string transaction_url = url + "/data-transfer/input-ports/" + in_port + "/transactions";
+  std::string action_url = url + "/site-to-site/input-ports/" + in_port + "/transactions";
 
-  std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
-  std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions";
+  std::string transaction_output_url = url + "/data-transfer/output-ports/" + out_port + "/transactions";
+  std::string action_output_url = url + "/site-to-site/output-ports/" + out_port + "/transactions";
 
   harness.setUrl(transaction_url, transaction_response);
 
@@ -154,7 +159,8 @@ void run_variance(std::string test_file_location, bool isSecure, std::string url
   flowResponder->setFlowUrl(flow_url);
   auto producedFlows = flowResponder->getFlows();
 
-  TransactionResponder *transaction_response_output = new TransactionResponder(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url);
+  TransactionResponder *transaction_response_output = new TransactionResponder(url, out_port,
+      false, profile.transaction_url_broken, profile.empty_transaction_url);
   std::string transaction_output_id = transaction_response_output->getTransactionId();
   transaction_response_output->setFeed(producedFlows);
 
diff --git a/extensions/http-curl/tests/HTTPUtils.h b/extensions/http-curl/tests/HTTPUtils.h
new file mode 100644
index 0000000..f8932d7
--- /dev/null
+++ b/extensions/http-curl/tests/HTTPUtils.h
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_HTTPUTILS_H
+#define NIFI_MINIFI_CPP_HTTPUTILS_H
+
+#include "RegexUtils.h"
+
+/**
+This function, unfortunately, assumes that we're parsing http components of a local host. On windows this is problematic
+so we convert localhost to our local hostname.
+  */
+inline bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
+#ifdef WIN32
+	auto hostname = (url.find(org::apache::nifi::minifi::io::Socket::getMyHostName()) != std::string::npos ? org::apache::nifi::minifi::io::Socket::getMyHostName() : "localhost");
+	std::string regexstr = "^(http|https)://(" + hostname + ":)([0-9]+)?(/.*)$";
+#else
+	std::string regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
+#endif
+
+	using namespace org::apache::nifi::minifi::utils;
+
+  auto rgx = Regex(regexstr, {Regex::Mode::ICASE});
+
+  if (rgx.match(url)) {
+    auto result = rgx.getResult();
+    if(result.size() >= 5){
+      scheme = result[1];
+      port = result[3];
+      path = result[4];
+      return true;
+    }
+  }
+  return false;
+}
+
+#endif //NIFI_MINIFI_CPP_HTTPUTILS_H
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 2b667fc..9676630 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -17,16 +17,13 @@
  */
 #ifndef LIBMINIFI_TEST_TESTSERVER_H_
 #define LIBMINIFI_TEST_TESTSERVER_H_
-#if defined(_WIN32) || __cplusplus > 201103L
-#include <regex>
-#else
-#include <regex.h>
-#endif
+
 #include <string>
 #include <iostream>
 #include "civetweb.h"
 #include "CivetServer.h"
 #include "civetweb.h"
+#include "HTTPUtils.h"
 
 
 /* Server context handle */
@@ -80,85 +77,6 @@ CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHand
 
 }
 
-/**
-This funciton, unfortunately, assumes that we're parsing http components of a local host. On windows this is problematic
-so we convert localhost to our local hostname. 
-  */
-bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) {
-
-#if (__cplusplus > 201103L) || defined(_WIN32)
-#ifdef WIN32
-	auto hostname = (url.find(org::apache::nifi::minifi::io::Socket::getMyHostName()) != std::string::npos ? org::apache::nifi::minifi::io::Socket::getMyHostName() : "localhost");
-	std::string regexstr = "^(http|https)://(" + hostname + ":)([0-9]+)?(/.*)$";
-#else
-	std::string regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
-#endif
-  std::regex rgx;
-  std::regex_constants::syntax_option_type regex_mode = std::regex_constants::icase;
-	
-  rgx = std::regex(regexstr, regex_mode);
- 
-  std::smatch matches;
-  std::string scratch = url;
-  if (std::regex_search(scratch, matches, rgx)) {
-	  for (int i = 1; i < matches.size(); i++) {
-		  auto str = matches[i].str();
-		  switch (i) {
-		  case 1:
-			  scheme = str;
-			  break;
-		  case 3:
-			  port = str;
-			  break;
-		  case 4:
-			  path = str;
-			  break;
-		  default:
-			  break;
-		  }
-	  }
-  }
-#else
-	const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$";
-  regex_t regex;
-
-  int ret = regcomp(&regex, regexstr, REG_EXTENDED);
-  if (ret) {
-    return false;
-  }
-
-  size_t potentialGroups = regex.re_nsub + 1;
-  regmatch_t groups[potentialGroups];
-  if (regexec(&regex, url.c_str(), potentialGroups, groups, 0) == 0) {
-    for (size_t i = 0; i < potentialGroups; i++) {
-      if (groups[i].rm_so == -1)
-        break;
-
-      std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so);
-      switch (i) {
-        case 1:
-          scheme = str;
-          break;
-        case 3:
-          port = str;
-          break;
-        case 4:
-          path = str;
-          break;
-        default:
-          break;
-      }
-    }
-  }
-  if (path.empty() || scheme.empty() || port.empty())
-    return false;
-
-  regfree(&regex);
-#endif
-  return true;
-
-}
-
 static void stop_webserver(CivetServer *server) {
   if (server != nullptr)
     delete server;
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
new file mode 100644
index 0000000..41619a8
--- /dev/null
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "CivetServer.h"
+#include "sitetosite/HTTPProtocol.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "io/StreamFactory.h"
+#include "RemoteProcessorGroupPort.h"
+#include "core/ConfigurableComponent.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "client/HTTPStream.h"
+
+class SiteToSiteTestHarness : public CoapIntegrationBase {
+public:
+  explicit SiteToSiteTestHarness(bool isSecure, std::chrono::milliseconds waitTime = std::chrono::milliseconds{1000})
+      : CoapIntegrationBase(waitTime.count()), isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
+    LogTestController::getInstance().setTrace<minifi::sitetosite::HttpSiteToSiteClient>();
+    LogTestController::getInstance().setTrace<minifi::sitetosite::SiteToSiteClient>();
+    LogTestController::getInstance().setTrace<utils::HTTPClient>();
+    LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<core::ConfigurableComponent>();
+    LogTestController::getInstance().setTrace<utils::HttpStreamingCallback>();
+
+    std::fstream file;
+    ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+
+    configuration->set("nifi.c2.enable", "false");
+    configuration->set("nifi.remote.input.http.enabled", "true");
+    configuration->set("nifi.remote.input.socket.port", "8099");
+  }
+
+  void cleanup() override {}
+
+  void runAssertions() override {}
+
+protected:
+  bool isSecure;
+  std::string dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+struct defaulted_handler{
+  CivetHandler* handler = nullptr;
+  CivetHandler* get(CivetHandler *def) const {
+    if(handler)return handler;
+    return def;
+  }
+  void set(std::chrono::milliseconds timeout) {
+    handler = new TimeoutingHTTPHandler(timeout);
+  }
+};
+
+/**
+ * Determines which responders will timeout
+ */
+struct timeout_test_profile{
+  defaulted_handler base_;
+  defaulted_handler transaction_;
+  defaulted_handler flow_;
+  defaulted_handler peer_;
+  defaulted_handler delete_;
+};
+
+void run_timeout_variance(std::string test_file_location, bool isSecure, std::string url, const timeout_test_profile &profile) {
+  SiteToSiteTestHarness harness(isSecure);
+
+  std::string in_port = "471deef6-2a6e-4a7d-912a-81cc17e3a204";
+
+  TransactionResponder *transaction_response = new TransactionResponder(url, in_port, true);
+
+  std::string transaction_id = transaction_response->getTransactionId();
+
+  harness.setKeyDir("");
+
+  std::string baseUrl = url + "/site-to-site";
+  SiteToSiteBaseResponder *base = new SiteToSiteBaseResponder(baseUrl);
+
+  harness.setUrl(baseUrl, profile.base_.get(base));
+
+  std::string transaction_url = url + "/data-transfer/input-ports/" + in_port + "/transactions";
+  std::string action_url = url + "/site-to-site/input-ports/" + in_port + "/transactions";
+
+  harness.setUrl(transaction_url, profile.transaction_.get(transaction_response));
+
+  std::string peer_url = url + "/site-to-site/peers";
+
+  PeerResponder *peer_response = new PeerResponder(url);
+
+  harness.setUrl(peer_url, profile.peer_.get(peer_response));
+
+  std::string flow_url = action_url + "/" + transaction_id + "/flow-files";
+
+  FlowFileResponder *flowResponder = new FlowFileResponder(true);
+  flowResponder->setFlowUrl(flow_url);
+
+  harness.setUrl(flow_url, profile.flow_.get(flowResponder));
+
+  std::string delete_url = transaction_url + "/" + transaction_id;
+  DeleteTransactionResponder *deleteResponse = new DeleteTransactionResponder(delete_url, "201 OK", 12);
+  harness.setUrl(delete_url, profile.delete_.get(deleteResponse));
+
+  harness.run(test_file_location);
+
+  assert(LogTestController::getInstance().contains("limit (200ms) reached, terminating connection") == true);
+
+  LogTestController::getInstance().reset();
+}
+
+int main(int argc, char **argv) {
+  transaction_id = 0;
+  transaction_id_output = 0;
+  std::string key_dir, test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+    key_dir = argv[2];
+    url = argv[3];
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+#ifdef WIN32
+  if (url.find("localhost") != std::string::npos) {
+	  std::string port, scheme, path;
+	  parse_http_components(url, port, scheme, path);
+	  url = scheme + "://" + org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + port +  path;
+  }
+#endif
+
+  const auto timeout = std::chrono::milliseconds{500};
+
+  {
+    timeout_test_profile profile;
+    profile.base_.set(timeout);
+    run_timeout_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    timeout_test_profile profile;
+    profile.flow_.set(timeout);
+    run_timeout_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    timeout_test_profile profile;
+    profile.transaction_.set(timeout);
+    run_timeout_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    timeout_test_profile profile;
+    profile.delete_.set(timeout);
+    run_timeout_variance(test_file_location, isSecure, url, profile);
+  }
+
+  {
+    timeout_test_profile profile;
+    profile.peer_.set(timeout);
+    run_timeout_variance(test_file_location, isSecure, url, profile);
+  }
+
+  return 0;
+}
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 66c8525..bbc1ce2 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -133,7 +133,7 @@ class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
 public:
   virtual void runAssertions() override {
     assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
-    assert(LogTestController::getInstance().contains("failed Timeout was reached"));
+    assert(LogTestController::getInstance().contains("limit (1000ms) reached, terminating connection"));
   }
 };
 
@@ -192,7 +192,7 @@ int main(int argc, char ** argv) {
   }
 
   {
-    InvokeHTTPResponseTimeoutHandler handler(std::chrono::milliseconds(4000));
+    TimeoutingHTTPHandler handler(std::chrono::milliseconds(4000));
     VerifyRWTimeoutInvokeHTTP harness;
     run(harness, url, test_file_location, key_dir, &handler);
   }
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index b28ee1f..8978624 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -108,6 +108,7 @@ class RemoteProcessorGroupPort : public core::Processor {
   static core::Property SSLContext;
   static core::Property port;
   static core::Property portUUID;
+  static core::Property idleTimeout;
   // Supported Relationships
   static core::Relationship relation;
  public:
@@ -211,6 +212,8 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   utils::Identifier protocol_uuid_;
 
+  std::chrono::milliseconds idle_timeout_{};
+
   // rest API end point info
   std::vector<struct RPG> nifi_instances_;
 
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 6ba2dbb..fda87b0 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -361,6 +361,14 @@ class SiteToSiteClientConfiguration {
     return stream_factory_;
   }
 
+  void setIdleTimeout(std::chrono::milliseconds timeout) {
+    idle_timeout_ = timeout;
+  }
+
+  std::chrono::milliseconds getIdleTimeout() const {
+    return idle_timeout_;
+  }
+
   // setInterface
   void setInterface(std::string &ifc) {
     local_network_interface_ = ifc;
@@ -385,6 +393,8 @@ class SiteToSiteClientConfiguration {
 
   std::string local_network_interface_;
 
+  std::chrono::milliseconds idle_timeout_{};
+
   // secore comms
 
   std::shared_ptr<controllers::SSLContextService> ssl_service_;
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index e1fffc3..132bb99 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -151,6 +151,13 @@ class SiteToSiteClient : public core::Connectable {
   }
 
   /**
+   * Sets the idle timeout.
+   */
+   void setIdleTimeout(std::chrono::milliseconds timeout) {
+     idle_timeout_ = timeout;
+   }
+
+  /**
    * Sets the base peer for this interface.
    */
   virtual void setPeer(std::unique_ptr<SiteToSitePeer> peer) {
@@ -246,6 +253,9 @@ class SiteToSiteClient : public core::Connectable {
   // portId
   utils::Identifier port_id_;
 
+  // idleTimeout
+  std::chrono::milliseconds idle_timeout_{};
+
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
 
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 2d1dd74..29d49e1 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -93,6 +93,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
 
         ptr->setPortId(uuid);
         ptr->setPeer(std::move(peer));
+        ptr->setIdleTimeout(client_configuration.getIdleTimeout());
         return ptr;
       }
       return nullptr;
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index f110e89..8d2c5ce 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -58,6 +58,9 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name
 core::Property RemoteProcessorGroupPort::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "");
 core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", "");
+core::Property RemoteProcessorGroupPort::idleTimeout(
+            core::PropertyBuilder::createProperty("Idle Timeout")->withDescription("Max idle time for remote service")->isRequired(false)
+                    ->withDefaultValue<core::TimePeriodValue>("15 s")->build());
 core::Relationship RemoteProcessorGroupPort::relation;
 
 std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextProtocol(bool create = true) {
@@ -77,6 +80,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
           sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host, rpg.port_, ssl_service != nullptr), this->getInterface(),
                                                            client_type_);
           config.setHTTPProxy(this->proxy_);
+          config.setIdleTimeout(idle_timeout_);
           nextProtocol = sitetosite::createClient(config);
         }
       } else if (peer_index_ >= 0) {
@@ -89,6 +93,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
           peer_index_ = 0;
         }
         config.setHTTPProxy(this->proxy_);
+        config.setIdleTimeout(idle_timeout_);
         nextProtocol = sitetosite::createClient(config);
       } else {
         logger_->log_debug("Refreshing the peer list since there are none configured.");
@@ -120,6 +125,7 @@ void RemoteProcessorGroupPort::initialize() {
   properties.insert(port);
   properties.insert(SSLContext);
   properties.insert(portUUID);
+  properties.insert(idleTimeout);
   setSupportedProperties(properties);
 // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -154,11 +160,23 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
   } else {
     std::string secureStr;
     bool is_secure = false;
-    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) {
+    if (configure_->get(Configure::nifi_remote_input_secure, secureStr) &&
+        org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) {
       ssl_service = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_);
       ssl_service->onEnable();
     }
   }
+  {
+    uint64_t idleTimeoutVal;
+    std::string idleTimeoutStr;
+    if (!context->getProperty(idleTimeout.getName(), idleTimeoutStr)
+        || !core::Property::getTimeMSFromString(idleTimeoutStr, idleTimeoutVal)) {
+      logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(),
+                         idleTimeout.getValue());
+      assert(core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal));
+    }
+    idle_timeout_ = std::chrono::milliseconds(idleTimeoutVal);
+  }
 
   std::lock_guard<std::mutex> lock(peer_mutex_);
   if (!nifi_instances_.empty()) {
@@ -200,6 +218,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
       }
       logger_->log_trace("Creating client");
       config.setHTTPProxy(this->proxy_);
+      config.setIdleTimeout(idle_timeout_);
       nextProtocol = sitetosite::createClient(config);
       logger_->log_trace("Created client, moving into available protocols");
       returnProtocol(std::move(nextProtocol));
@@ -306,6 +325,7 @@ std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo
       // use a connection timeout. if this times out we will simply attempt re-connection
       // so no need for configuration parameter that isn't already defined in Processor
       client->setConnectionTimeout(std::chrono::milliseconds(10000));
+      client->setReadTimeout(idle_timeout_);
 
       token = utils::get_token(client.get(), this->rest_user_name_, this->rest_password_);
       logger_->log_debug("Token from NiFi REST Api endpoint %s,  %s", loginUrl.str(), token);
@@ -324,6 +344,7 @@ std::pair<std::string, int> RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo
     // use a connection timeout. if this times out we will simply attempt re-connection
     // so no need for configuration parameter that isn't already defined in Processor
     client->setConnectionTimeout(std::chrono::milliseconds(10000));
+    client->setReadTimeout(idle_timeout_);
     if (!proxy_.host.empty()) {
       client->setHTTPProxy(proxy_);
     }
@@ -384,6 +405,7 @@ void RemoteProcessorGroupPort::refreshPeerList() {
                                                    this->getInterface(), client_type_);
   config.setSecurityContext(ssl_service);
   config.setHTTPProxy(this->proxy_);
+  config.setIdleTimeout(idle_timeout_);
   protocol = sitetosite::createClient(config);
 
   if (protocol)
diff --git a/libminifi/test/resources/TestHTTPSiteToSite.yml b/libminifi/test/resources/TestHTTPSiteToSite.yml
index 2bdee30..339d8c6 100644
--- a/libminifi/test/resources/TestHTTPSiteToSite.yml
+++ b/libminifi/test/resources/TestHTTPSiteToSite.yml
@@ -23,7 +23,7 @@ Processors:
       class: org.apache.nifi.processors.standard.GenerateFlowFile
       max concurrent tasks: 1
       scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling period: 5 sec
       penalization period: 30 sec
       yield period: 10 sec
       run duration nanos: 0
diff --git a/libminifi/test/resources/TestInvokeHTTPPost.yml b/libminifi/test/resources/TestInvokeHTTPPost.yml
index 9450edf..cb8eb8e 100644
--- a/libminifi/test/resources/TestInvokeHTTPPost.yml
+++ b/libminifi/test/resources/TestInvokeHTTPPost.yml
@@ -75,7 +75,7 @@ Processors:
     Disable Peer Verification: 'false'
     HTTP Method: POST
     Include Date Header: 'true'
-    Read Timeout: 4 s
+    Read Timeout: 1 s
     Remote URL: http://localhost:0/minifi
     Use Chunked Encoding: 'false'
     send-message-body: 'true'
diff --git a/libminifi/test/resources/TestInvokeHTTPPostSecure.yml b/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
index 2fbcd1b..6e9bd9a 100644
--- a/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
+++ b/libminifi/test/resources/TestInvokeHTTPPostSecure.yml
@@ -74,7 +74,7 @@ Processors:
     Content-type: application/octet-stream
     Disable Peer Verification: 'true'
     HTTP Method: POST
-    Read Timeout: 3 s
+    Read Timeout: 1 s
     Remote URL: https://localhost:0/minifi
     Use Chunked Encoding: 'false'
     send-message-body: 'true'
diff --git a/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml b/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml
new file mode 100644
index 0000000..d49314f
--- /dev/null
+++ b/libminifi/test/resources/TestTimeoutHTTPSiteToSite.yml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+  id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+  name: MiNiFi Flow
+
+Processors:
+  - name: GetFile
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+    class: org.apache.nifi.processors.standard.GenerateFlowFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 2 sec
+    penalization period: 30 sec
+    yield period: 10 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+
+
+Connections:
+  - name: GenerateFlowFileS2S
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+    source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
+    source relationship name: success
+    destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Groups:
+  - name: NiFi Flow
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a208
+    url: http://localhost:8098/nifi
+    timeout: 30 secs
+    yield period: 1 sec
+    transport protocol: HTTP
+    Input Ports:
+      - id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+        name: From Node A
+        max concurrent tasks: 1
+        use compression: false
+        Properties: # Deviates from spec and will later be removed when this is autonegotiated
+          Port UUID: 471deef6-2a6e-4a7d-912a-81cc17e3a204
+          Port: 8082
+          Host Name: 127.0.0.1
+          Idle Timeout: 200 ms
\ No newline at end of file