You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/24 10:33:11 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1263 - Possible fix for segfault

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new e26d391  MINIFICPP-1263 - Possible fix for segfault
e26d391 is described below

commit e26d391b20c52e17931feddf8211cc3fe7affe9f
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Jun 16 17:32:03 2020 +0200

    MINIFICPP-1263 - Possible fix for segfault
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #818
---
 extensions/coap/tests/CoapIntegrationBase.h        |  10 +-
 extensions/http-curl/tests/AbsoluteTimeoutTest.cpp |   2 +-
 extensions/http-curl/tests/HTTPHandlers.h          |   4 +-
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  14 +--
 .../http-curl/tests/HttpGetIntegrationTest.cpp     |  30 +++---
 extensions/http-curl/tests/ServerAwareHandler.h    |  23 +++--
 extensions/http-curl/tests/TestServer.h            | 108 ++++++++++++---------
 7 files changed, 105 insertions(+), 86 deletions(-)

diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index c8f8499..bd00151 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -42,7 +42,7 @@ class CoapIntegrationBase : public IntegrationBase {
   void setUrl(std::string url, CivetHandler *handler);
 
   void shutdownBeforeFlowController() override {
-    stop_webserver(server);
+    server.reset();
   }
 
   virtual void run(std::string test_file_location) override {
@@ -86,13 +86,13 @@ class CoapIntegrationBase : public IntegrationBase {
   }
 
  protected:
-  CivetServer *server;
+  std::unique_ptr<TestServer> server;
 };
 
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
-  struct mg_callbacks callback;
+  CivetCallbacks callback{};
   if (url.find("localhost") != std::string::npos) {
     if (server != nullptr) {
       server->addHandler(path, handler);
@@ -105,9 +105,9 @@ void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
       callback.init_ssl = ssl_enable;
       port += "s";
       callback.log_message = log_message;
-      server = start_webserver(port, path, handler, &callback, cert, cert);
+      server = utils::make_unique<TestServer>(port, path, handler, &callback, cert, cert);
     } else {
-      server = start_webserver(port, path, handler);
+      server = utils::make_unique<TestServer>(port, path, handler);
     }
   }
 }
diff --git a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
index 590529b..658a657 100644
--- a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
+++ b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
@@ -35,7 +35,7 @@ int main() {
     std::chrono::milliseconds(500)
   });
 
-  auto server = start_webserver(port, rootURI, &handler);
+  TestServer server(port, rootURI, &handler);
 
   auto plan = controller.createPlan();
 
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 83e70ee..71cd7e2 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -511,14 +511,14 @@ public:
  private:
   void respond(struct mg_connection *conn) {
     if (wait_times_.size() > 0 && wait_times_[0].count() > 0) {
-      std::this_thread::sleep_for(wait_times_[0]);
+      sleep_for(wait_times_[0]);
     }
     int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
     mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count);
     for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) {
       mg_printf(conn, "a");
       if (wait_times_[chunkIdx + 1].count() > 0) {
-        std::this_thread::sleep_for(wait_times_[chunkIdx + 1]);
+        sleep_for(wait_times_[chunkIdx + 1]);
       }
     }
   }
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 776c647..04f0c99 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -44,8 +44,7 @@ class CoapIntegrationBase : public IntegrationBase {
   void setUrl(const std::string& url, ServerAwareHandler *handler);
 
   void shutdownBeforeFlowController() override {
-    is_server_running = false;
-    stop_webserver(server);
+    server.reset();
   }
 
   std::string getWebPort() {
@@ -57,19 +56,16 @@ class CoapIntegrationBase : public IntegrationBase {
   }
 
  protected:
-  std::atomic_bool is_server_running;
-  CivetServer *server;
+  std::unique_ptr<TestServer> server;
 };
 
 void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *handler) {
-  handler->initServerFlag(is_server_running);
   parse_http_components(url, port, scheme, path);
-  struct mg_callbacks callback{};
+  CivetCallbacks callback{};
   if (server != nullptr) {
     server->addHandler(path, handler);
     return;
   }
-  is_server_running = true;
   if (scheme == "https" && !key_dir.empty()) {
     std::string cert = "";
     cert = key_dir + "nifi-cert.pem";
@@ -77,9 +73,9 @@ void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *han
     callback.init_ssl = ssl_enable;
     port += "s";
     callback.log_message = log_message;
-    server = start_webserver(port, path, handler, &callback, cert, cert);
+    server = utils::make_unique<TestServer>(port, path, handler, &callback, cert, cert);
   } else {
-    server = start_webserver(port, path, handler);
+    server = utils::make_unique<TestServer>(port, path, handler);
   }
   if (port == "0" || port == "0s") {
     bool secure = (port == "0s");
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index e02f22a..f149d4c 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -74,7 +74,6 @@ class HttpResponder : public CivetHandler {
 };
 
 int main(int argc, char **argv) {
-  init_webserver();
   LogTestController::getInstance().setDebug<core::Processor>();
   LogTestController::getInstance().setDebug<core::ProcessSession>();
   LogTestController::getInstance().setDebug<utils::HTTPClient>();
@@ -121,27 +120,27 @@ int main(int argc, char **argv) {
   inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
   HttpResponder h_ex;
   std::string port, scheme, path;
-  CivetServer *server = nullptr;
+  std::unique_ptr<TestServer> server;
   parse_http_components(url, port, scheme, path);
-  struct mg_callbacks callback{};
-    if (scheme == "https") {
-      std::string cert;
-      cert = key_dir + "nifi-cert.pem";
-      memset(&callback, 0, sizeof(callback));
-      callback.init_ssl = ssl_enable;
-      std::string https_port = port + "s";
-      callback.log_message = log_message;
-      server = start_webserver(https_port, path, &h_ex, &callback, cert, cert);
-    } else {
-      server = start_webserver(port, path, &h_ex);
-    }
+  CivetCallbacks callback{};
+  if (scheme == "https") {
+    std::string cert;
+    cert = key_dir + "nifi-cert.pem";
+    memset(&callback, 0, sizeof(callback));
+    callback.init_ssl = ssl_enable;
+    std::string https_port = port + "s";
+    callback.log_message = log_message;
+    server = utils::make_unique<TestServer>(https_port, path, &h_ex, &callback, cert, cert);
+  } else {
+    server = utils::make_unique<TestServer>(port, path, &h_ex);
+  }
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
   if (url.find("localhost") == std::string::npos) {
-    stop_webserver(server);
+    server.reset();
     exit(1);
   }
   std::string logs = LogTestController::getInstance().log_output.str();
@@ -152,6 +151,5 @@ int main(int argc, char **argv) {
   assert(logs.find("key:flow.id") != std::string::npos);
 
   LogTestController::getInstance().reset();
-  stop_webserver(server);
   return 0;
 }
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h b/extensions/http-curl/tests/ServerAwareHandler.h
index 63ee0ee..bb88c2f 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -21,16 +21,25 @@
 
 class ServerAwareHandler: public CivetHandler{
 protected:
-  const std::atomic_bool *is_server_running{nullptr};
-  bool isServerRunning(){
-    assert(is_server_running);
-    return *is_server_running;
+  void sleep_for(std::chrono::milliseconds time) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
   }
+
+  bool isServerRunning() const {
+    return !terminate_.load();
+  }
+
 public:
-  void initServerFlag(std::atomic_bool& is_running){
-    assert(is_server_running == nullptr);
-    is_server_running = &is_running;
+  void stop(){
+    terminate_ = true;
+    stop_signal_.notify_all();
   }
+
+ private:
+  std::mutex mutex_;
+  std::condition_variable stop_signal_;
+  std::atomic_bool terminate_{false};
 };
 
 #endif //NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 9676630..13a6cf9 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -24,65 +24,81 @@
 #include "CivetServer.h"
 #include "civetweb.h"
 #include "HTTPUtils.h"
+#include "ServerAwareHandler.h"
 
-
-/* Server context handle */
-static std::string resp_str;
-
-void init_webserver() {
-  mg_init_library(0);
-}
-
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
-  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "error_log_file",
-      "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "4", "ssl_cipher_list",
-      "ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no", 0 };
-// ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
-  std::vector<std::string> cpp_options;
-  for (size_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-
-  if (!mg_check_feature(2)) {
-      fprintf(stderr,
-              "Error: Embedded example built with SSL support, "
-              "but civetweb library build without.\n");
-      return 0;
+/**
+ * A wrapper around CivetServer which notifies Handlers before shutdown,
+ * so they wouldn't get stuck (if a handler returns after shutdown is
+ * initiated it might get stuck inside worker_thread_run > consume_socket)
+ */
+class TestServer{
+  struct CivetLibrary{
+    CivetLibrary() {
+      if (getCounter()++ == 0) {
+        mg_init_library(0);
+      }
     }
+    ~CivetLibrary() {
+      if (--getCounter() == 0) {
+        mg_exit_library();
+      }
+    }
+   private:
+    static std::atomic<int>& getCounter() {
+      static std::atomic<int> counter{0};
+      return counter;
+    }
+  };
+ public:
+  TestServer(std::string &port, std::string &rooturi, CivetHandler *handler, CivetCallbacks *callbacks, std::string &cert, std::string &ca_cert) {
 
+    if (!mg_check_feature(2)) {
+      throw std::runtime_error("Error: Embedded example built with SSL support, "
+                               "but civetweb library build without.\n");
+    }
 
-  //mg_init_library(MG_FEATURES_SSL);
-
-  CivetServer *server = new CivetServer(cpp_options, (CivetCallbacks*)callbacks);
 
-  server->addHandler(rooturi, handler);
+    //mg_init_library(MG_FEATURES_SSL);
 
-  return server;
+    // ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
+    std::vector<std::string> cpp_options{ "document_root", ".", "listening_ports", port, "error_log_file",
+                              "error.log", "ssl_certificate", ca_cert, "ssl_protocol_version", "4", "ssl_cipher_list",
+                              "ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no"};
+    server_ = utils::make_unique<CivetServer>(cpp_options, callbacks);
 
-}
+    addHandler(rooturi, handler);
+  }
 
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
-  const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
+  TestServer(std::string &port, std::string &rooturi, CivetHandler *handler) {
+    std::vector<std::string> cpp_options{"document_root", ".", "listening_ports", port};
+    server_ = utils::make_unique<CivetServer>(cpp_options);
 
-  std::vector<std::string> cpp_options;
-  for (size_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
+    addHandler(rooturi, handler);
   }
-  CivetServer *server = new CivetServer(cpp_options);
-
-  server->addHandler(rooturi, handler);
 
-  return server;
+  void addHandler(const std::string& uri, CivetHandler* handler) {
+    handlers_.push_back(handler);
+    server_->addHandler(uri, handler);
+  }
 
-}
+  std::vector<int> getListeningPorts() {
+    return server_->getListeningPorts();
+  }
 
-static void stop_webserver(CivetServer *server) {
-  if (server != nullptr)
-    delete server;
+  ~TestServer() {
+    for (auto handler : handlers_) {
+      auto serverAwareHandler = dynamic_cast<ServerAwareHandler*>(handler);
+      if (serverAwareHandler) serverAwareHandler->stop();
+    }
 
-  /* Un-initialize the library */
-  mg_exit_library();
-}
+  }
+ private:
+  // server_ depends on lib_ (the library initializer)
+  // so their order matters
+  CivetLibrary lib_;
+  std::unique_ptr<CivetServer> server_;
+
+  std::vector<CivetHandler*> handlers_;
+};
 
 #endif