You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/15 20:22:23 UTC

[nifi-minifi-cpp] 03/03: MINIFICPP-1255 - Set absolute timeout and also make sure that no timeout is initialized to zero.

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8e8d2832e48ac264eb1c4118df80a0da58a5ed7d
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Fri Jun 12 15:00:44 2020 +0200

    MINIFICPP-1255 - Set absolute timeout and also make sure that no timeout is initialized to zero.
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #810
---
 extensions/http-curl/client/HTTPClient.cpp         | 13 ++++--
 extensions/http-curl/client/HTTPClient.h           |  4 +-
 extensions/http-curl/tests/AbsoluteTimeoutTest.cpp | 50 ++++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |  1 +
 extensions/http-curl/tests/HTTPHandlers.h          | 33 ++++++++------
 .../http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp | 12 +++---
 .../http-curl/tests/VerifyInvokeHTTPTest.cpp       |  2 +-
 libminifi/include/RemoteProcessorGroupPort.h       |  2 +-
 libminifi/include/sitetosite/SiteToSite.h          |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |  2 +-
 libminifi/src/RemoteProcessorGroupPort.cpp         |  6 ++-
 11 files changed, 98 insertions(+), 29 deletions(-)

diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 837f3b7..518da4a 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -241,10 +241,13 @@ void HTTPClient::setUseChunkedEncoding() {
 bool HTTPClient::submit() {
   if (IsNullOrEmpty(url_))
     return false;
+
+  int absoluteTimeout = std::max(0, 3 * static_cast<int>(read_timeout_ms_.count()));
+
   curl_easy_setopt(http_session_, CURLOPT_NOSIGNAL, 1);
-  if (connect_timeout_ms_.count() > 0) {
-    curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT_MS, connect_timeout_ms_.count());
-  }
+  // setting it to 0 will result in the default 300 second timeout
+  curl_easy_setopt(http_session_, CURLOPT_CONNECTTIMEOUT_MS, std::max(0, static_cast<int>(connect_timeout_ms_.count())));
+  curl_easy_setopt(http_session_, CURLOPT_TIMEOUT_MS, absoluteTimeout);
 
   if (read_timeout_ms_.count() > 0) {
     progress_.reset();
@@ -252,6 +255,7 @@ bool HTTPClient::submit() {
     curl_easy_setopt(http_session_, CURLOPT_XFERINFOFUNCTION, onProgress);
     curl_easy_setopt(http_session_, CURLOPT_XFERINFODATA, (void*)this);
   }else{
+    // the user explicitly set it to 0
     curl_easy_setopt(http_session_, CURLOPT_NOPROGRESS, 1);
   }
   if (headers_ != nullptr) {
@@ -286,6 +290,9 @@ bool HTTPClient::submit() {
   }
   curl_easy_getinfo(http_session_, CURLINFO_RESPONSE_CODE, &http_code);
   curl_easy_getinfo(http_session_, CURLINFO_CONTENT_TYPE, &content_type_str_);
+  if (res == CURLE_OPERATION_TIMEDOUT) {
+    logger_->log_error("HTTP operation timed out, with absolute timeout %dms\n", absoluteTimeout);
+  }
   if (res != CURLE_OK) {
     logger_->log_error("curl_easy_perform() failed %s on %s, error code %d\n", curl_easy_strerror(res), url_, res);
     return false;
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 6a4b4a2..8cc38ab 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -261,9 +261,9 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
   std::string url_;
-  std::chrono::milliseconds connect_timeout_ms_{0};
+  std::chrono::milliseconds connect_timeout_ms_{30000};
   // read timeout.
-  std::chrono::milliseconds read_timeout_ms_{0};
+  std::chrono::milliseconds read_timeout_ms_{30000};
   char *content_type_str_{nullptr};
   std::string content_type_;
   struct curl_slist *headers_{nullptr};
diff --git a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
new file mode 100644
index 0000000..590529b
--- /dev/null
+++ b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "tests/TestServer.h"
+#include "HTTPHandlers.h"
+
+int main() {
+  TestController controller;
+
+  std::string port = "12324";
+  std::string rootURI =  "/";
+  TimeoutingHTTPHandler handler({
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500),
+    std::chrono::milliseconds(500)
+  });
+
+  auto server = start_webserver(port, rootURI, &handler);
+
+  auto plan = controller.createPlan();
+
+  auto processor = plan->addProcessor("InvokeHTTP", "InvokeHTTP");
+  processor->setProperty("Read Timeout", "1 s");
+  processor->setProperty("Remote URL", "http://localhost:" + port);
+  processor->setAutoTerminatedRelationships({{"failure", "d"}});
+
+  plan->runNextProcessor();
+
+  assert(LogTestController::getInstance().contains("HTTP operation timed out, with absolute timeout 3000ms"));
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index e9f5d3d..8ba76a2 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -94,3 +94,4 @@ add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/T
 add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/")
 add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
 add_test(NAME VerifyInvokeHTTPTest COMMAND VerifyInvokeHTTPTest "${TEST_RESOURCES}/TestInvokeHTTPPost.yml")
+add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest)
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index eda7a0e..83e70ee 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -489,30 +489,39 @@ public:
 
 class TimeoutingHTTPHandler : public ServerAwareHandler {
 public:
-  TimeoutingHTTPHandler(std::chrono::milliseconds wait_ms)
-      : wait_(wait_ms) {
+  TimeoutingHTTPHandler(std::vector<std::chrono::milliseconds> wait_times)
+      : wait_times_(wait_times) {
   }
   bool handlePost(CivetServer *, struct mg_connection *conn) {
-    std::this_thread::sleep_for(wait_);
-    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    respond(conn);
     return true;
   }
   bool handleGet(CivetServer *, struct mg_connection *conn) {
-    std::this_thread::sleep_for(wait_);
-    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    respond(conn);
     return true;
   }
   bool handleDelete(CivetServer *, struct mg_connection *conn) {
-    std::this_thread::sleep_for(wait_);
-    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    respond(conn);
     return true;
   }
   bool handlePut(CivetServer *, struct mg_connection *conn) {
-    std::this_thread::sleep_for(wait_);
-    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
+    respond(conn);
     return true;
   }
-protected:
-  std::chrono::milliseconds wait_;
+ private:
+  void respond(struct mg_connection *conn) {
+    if (wait_times_.size() > 0 && wait_times_[0].count() > 0) {
+      std::this_thread::sleep_for(wait_times_[0]);
+    }
+    int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
+    mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count);
+    for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) {
+      mg_printf(conn, "a");
+      if (wait_times_[chunkIdx + 1].count() > 0) {
+        std::this_thread::sleep_for(wait_times_[chunkIdx + 1]);
+      }
+    }
+  }
+  std::vector<std::chrono::milliseconds> wait_times_;
 };
 #endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index 91a9a77..1e9c59e 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -96,7 +96,7 @@ struct defaulted_handler{
     if(handler)return handler;
     return def;
   }
-  void set(std::chrono::milliseconds timeout) {
+  void set(std::vector<std::chrono::milliseconds> timeout) {
     handler = new TimeoutingHTTPHandler(timeout);
   }
 };
@@ -184,31 +184,31 @@ int main(int argc, char **argv) {
 
   {
     timeout_test_profile profile;
-    profile.base_.set(timeout);
+    profile.base_.set({timeout});
     run_timeout_variance(test_file_location, isSecure, url, profile);
   }
 
   {
     timeout_test_profile profile;
-    profile.flow_.set(timeout);
+    profile.flow_.set({timeout});
     run_timeout_variance(test_file_location, isSecure, url, profile);
   }
 
   {
     timeout_test_profile profile;
-    profile.transaction_.set(timeout);
+    profile.transaction_.set({timeout});
     run_timeout_variance(test_file_location, isSecure, url, profile);
   }
 
   {
     timeout_test_profile profile;
-    profile.delete_.set(timeout);
+    profile.delete_.set({timeout});
     run_timeout_variance(test_file_location, isSecure, url, profile);
   }
 
   {
     timeout_test_profile profile;
-    profile.peer_.set(timeout);
+    profile.peer_.set({timeout});
     run_timeout_variance(test_file_location, isSecure, url, profile);
   }
 
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 7d31165..fae79b5 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -192,7 +192,7 @@ int main(int argc, char ** argv) {
   }
 
   {
-    TimeoutingHTTPHandler handler(std::chrono::milliseconds(4000));
+    TimeoutingHTTPHandler handler({std::chrono::milliseconds(4000)});
     VerifyRWTimeoutInvokeHTTP harness;
     run(harness, url, test_file_location, key_dir, &handler);
   }
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 5fb6742..651b002 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -214,7 +214,7 @@ class RemoteProcessorGroupPort : public core::Processor {
 
   utils::Identifier protocol_uuid_;
 
-  std::chrono::milliseconds idle_timeout_{};
+  std::chrono::milliseconds idle_timeout_{15000};
 
   // rest API end point info
   std::vector<struct RPG> nifi_instances_;
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 896faac..feb1ed4 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -395,7 +395,7 @@ class SiteToSiteClientConfiguration {
 
   std::string local_network_interface_;
 
-  std::chrono::milliseconds idle_timeout_{};
+  std::chrono::milliseconds idle_timeout_{15000};
 
   // secore comms
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 6e9b333..fdfe58d 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -254,7 +254,7 @@ class SiteToSiteClient : public core::Connectable {
   utils::Identifier port_id_;
 
   // idleTimeout
-  std::chrono::milliseconds idle_timeout_{};
+  std::chrono::milliseconds idle_timeout_{15000};
 
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 8d2c5ce..b945396 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -167,13 +167,15 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
     }
   }
   {
-    uint64_t idleTimeoutVal;
+    uint64_t idleTimeoutVal = 15000;
     std::string idleTimeoutStr;
     if (!context->getProperty(idleTimeout.getName(), idleTimeoutStr)
         || !core::Property::getTimeMSFromString(idleTimeoutStr, idleTimeoutVal)) {
       logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(),
                          idleTimeout.getValue());
-      assert(core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal));
+      if (!core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal)) {
+        assert(false);  // Couldn't parse our default value
+      }
     }
     idle_timeout_ = std::chrono::milliseconds(idleTimeoutVal);
   }