You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/24 10:33:11 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1263 - Possible
fix for segfault
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new e26d391 MINIFICPP-1263 - Possible fix for segfault
e26d391 is described below
commit e26d391b20c52e17931feddf8211cc3fe7affe9f
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Jun 16 17:32:03 2020 +0200
MINIFICPP-1263 - Possible fix for segfault
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #818
---
extensions/coap/tests/CoapIntegrationBase.h | 10 +-
extensions/http-curl/tests/AbsoluteTimeoutTest.cpp | 2 +-
extensions/http-curl/tests/HTTPHandlers.h | 4 +-
extensions/http-curl/tests/HTTPIntegrationBase.h | 14 +--
.../http-curl/tests/HttpGetIntegrationTest.cpp | 30 +++---
extensions/http-curl/tests/ServerAwareHandler.h | 23 +++--
extensions/http-curl/tests/TestServer.h | 108 ++++++++++++---------
7 files changed, 105 insertions(+), 86 deletions(-)
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index c8f8499..bd00151 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -42,7 +42,7 @@ class CoapIntegrationBase : public IntegrationBase {
void setUrl(std::string url, CivetHandler *handler);
void shutdownBeforeFlowController() override {
- stop_webserver(server);
+ server.reset();
}
virtual void run(std::string test_file_location) override {
@@ -86,13 +86,13 @@ class CoapIntegrationBase : public IntegrationBase {
}
protected:
- CivetServer *server;
+ std::unique_ptr<TestServer> server;
};
void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
- struct mg_callbacks callback;
+ CivetCallbacks callback{};
if (url.find("localhost") != std::string::npos) {
if (server != nullptr) {
server->addHandler(path, handler);
@@ -105,9 +105,9 @@ void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
callback.init_ssl = ssl_enable;
port += "s";
callback.log_message = log_message;
- server = start_webserver(port, path, handler, &callback, cert, cert);
+ server = utils::make_unique<TestServer>(port, path, handler, &callback, cert, cert);
} else {
- server = start_webserver(port, path, handler);
+ server = utils::make_unique<TestServer>(port, path, handler);
}
}
}
diff --git a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
index 590529b..658a657 100644
--- a/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
+++ b/extensions/http-curl/tests/AbsoluteTimeoutTest.cpp
@@ -35,7 +35,7 @@ int main() {
std::chrono::milliseconds(500)
});
- auto server = start_webserver(port, rootURI, &handler);
+ TestServer server(port, rootURI, &handler);
auto plan = controller.createPlan();
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 83e70ee..71cd7e2 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -511,14 +511,14 @@ public:
private:
void respond(struct mg_connection *conn) {
if (wait_times_.size() > 0 && wait_times_[0].count() > 0) {
- std::this_thread::sleep_for(wait_times_[0]);
+ sleep_for(wait_times_[0]);
}
int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count);
for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) {
mg_printf(conn, "a");
if (wait_times_[chunkIdx + 1].count() > 0) {
- std::this_thread::sleep_for(wait_times_[chunkIdx + 1]);
+ sleep_for(wait_times_[chunkIdx + 1]);
}
}
}
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 776c647..04f0c99 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -44,8 +44,7 @@ class CoapIntegrationBase : public IntegrationBase {
void setUrl(const std::string& url, ServerAwareHandler *handler);
void shutdownBeforeFlowController() override {
- is_server_running = false;
- stop_webserver(server);
+ server.reset();
}
std::string getWebPort() {
@@ -57,19 +56,16 @@ class CoapIntegrationBase : public IntegrationBase {
}
protected:
- std::atomic_bool is_server_running;
- CivetServer *server;
+ std::unique_ptr<TestServer> server;
};
void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *handler) {
- handler->initServerFlag(is_server_running);
parse_http_components(url, port, scheme, path);
- struct mg_callbacks callback{};
+ CivetCallbacks callback{};
if (server != nullptr) {
server->addHandler(path, handler);
return;
}
- is_server_running = true;
if (scheme == "https" && !key_dir.empty()) {
std::string cert = "";
cert = key_dir + "nifi-cert.pem";
@@ -77,9 +73,9 @@ void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *han
callback.init_ssl = ssl_enable;
port += "s";
callback.log_message = log_message;
- server = start_webserver(port, path, handler, &callback, cert, cert);
+ server = utils::make_unique<TestServer>(port, path, handler, &callback, cert, cert);
} else {
- server = start_webserver(port, path, handler);
+ server = utils::make_unique<TestServer>(port, path, handler);
}
if (port == "0" || port == "0s") {
bool secure = (port == "0s");
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index e02f22a..f149d4c 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -74,7 +74,6 @@ class HttpResponder : public CivetHandler {
};
int main(int argc, char **argv) {
- init_webserver();
LogTestController::getInstance().setDebug<core::Processor>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setDebug<utils::HTTPClient>();
@@ -121,27 +120,27 @@ int main(int argc, char **argv) {
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
HttpResponder h_ex;
std::string port, scheme, path;
- CivetServer *server = nullptr;
+ std::unique_ptr<TestServer> server;
parse_http_components(url, port, scheme, path);
- struct mg_callbacks callback{};
- if (scheme == "https") {
- std::string cert;
- cert = key_dir + "nifi-cert.pem";
- memset(&callback, 0, sizeof(callback));
- callback.init_ssl = ssl_enable;
- std::string https_port = port + "s";
- callback.log_message = log_message;
- server = start_webserver(https_port, path, &h_ex, &callback, cert, cert);
- } else {
- server = start_webserver(port, path, &h_ex);
- }
+ CivetCallbacks callback{};
+ if (scheme == "https") {
+ std::string cert;
+ cert = key_dir + "nifi-cert.pem";
+ memset(&callback, 0, sizeof(callback));
+ callback.init_ssl = ssl_enable;
+ std::string https_port = port + "s";
+ callback.log_message = log_message;
+ server = utils::make_unique<TestServer>(https_port, path, &h_ex, &callback, cert, cert);
+ } else {
+ server = utils::make_unique<TestServer>(port, path, &h_ex);
+ }
controller->load();
controller->start();
waitToVerifyProcessor();
controller->waitUnload(60000);
if (url.find("localhost") == std::string::npos) {
- stop_webserver(server);
+ server.reset();
exit(1);
}
std::string logs = LogTestController::getInstance().log_output.str();
@@ -152,6 +151,5 @@ int main(int argc, char **argv) {
assert(logs.find("key:flow.id") != std::string::npos);
LogTestController::getInstance().reset();
- stop_webserver(server);
return 0;
}
diff --git a/extensions/http-curl/tests/ServerAwareHandler.h b/extensions/http-curl/tests/ServerAwareHandler.h
index 63ee0ee..bb88c2f 100644
--- a/extensions/http-curl/tests/ServerAwareHandler.h
+++ b/extensions/http-curl/tests/ServerAwareHandler.h
@@ -21,16 +21,25 @@
class ServerAwareHandler: public CivetHandler{
protected:
- const std::atomic_bool *is_server_running{nullptr};
- bool isServerRunning(){
- assert(is_server_running);
- return *is_server_running;
+ void sleep_for(std::chrono::milliseconds time) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ stop_signal_.wait_for(lock, time, [&] {return terminate_.load();});
}
+
+ bool isServerRunning() const {
+ return !terminate_.load();
+ }
+
public:
- void initServerFlag(std::atomic_bool& is_running){
- assert(is_server_running == nullptr);
- is_server_running = &is_running;
+ void stop(){
+ terminate_ = true;
+ stop_signal_.notify_all();
}
+
+ private:
+ std::mutex mutex_;
+ std::condition_variable stop_signal_;
+ std::atomic_bool terminate_{false};
};
#endif //NIFI_MINIFI_CPP_SERVERAWAREHANDLER_H
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 9676630..13a6cf9 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -24,65 +24,81 @@
#include "CivetServer.h"
#include "civetweb.h"
#include "HTTPUtils.h"
+#include "ServerAwareHandler.h"
-
-/* Server context handle */
-static std::string resp_str;
-
-void init_webserver() {
- mg_init_library(0);
-}
-
-
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "error_log_file",
- "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "4", "ssl_cipher_list",
- "ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no", 0 };
-// ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
- std::vector<std::string> cpp_options;
- for (size_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
-
- if (!mg_check_feature(2)) {
- fprintf(stderr,
- "Error: Embedded example built with SSL support, "
- "but civetweb library build without.\n");
- return 0;
+/**
+ * A wrapper around CivetServer which notifies Handlers before shutdown,
+ * so they wouldn't get stuck (if a handler returns after shutdown is
+ * initiated it might get stuck inside worker_thread_run > consume_socket)
+ */
+class TestServer{
+ struct CivetLibrary{
+ CivetLibrary() {
+ if (getCounter()++ == 0) {
+ mg_init_library(0);
+ }
}
+ ~CivetLibrary() {
+ if (--getCounter() == 0) {
+ mg_exit_library();
+ }
+ }
+ private:
+ static std::atomic<int>& getCounter() {
+ static std::atomic<int> counter{0};
+ return counter;
+ }
+ };
+ public:
+ TestServer(std::string &port, std::string &rooturi, CivetHandler *handler, CivetCallbacks *callbacks, std::string &cert, std::string &ca_cert) {
+ if (!mg_check_feature(2)) {
+ throw std::runtime_error("Error: Embedded example built with SSL support, "
+ "but civetweb library build without.\n");
+ }
- //mg_init_library(MG_FEATURES_SSL);
-
- CivetServer *server = new CivetServer(cpp_options, (CivetCallbacks*)callbacks);
- server->addHandler(rooturi, handler);
+ //mg_init_library(MG_FEATURES_SSL);
- return server;
+ // ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
+ std::vector<std::string> cpp_options{ "document_root", ".", "listening_ports", port, "error_log_file",
+ "error.log", "ssl_certificate", ca_cert, "ssl_protocol_version", "4", "ssl_cipher_list",
+ "ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no"};
+ server_ = utils::make_unique<CivetServer>(cpp_options, callbacks);
-}
+ addHandler(rooturi, handler);
+ }
-CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) {
- const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 };
+ TestServer(std::string &port, std::string &rooturi, CivetHandler *handler) {
+ std::vector<std::string> cpp_options{"document_root", ".", "listening_ports", port};
+ server_ = utils::make_unique<CivetServer>(cpp_options);
- std::vector<std::string> cpp_options;
- for (size_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
+ addHandler(rooturi, handler);
}
- CivetServer *server = new CivetServer(cpp_options);
-
- server->addHandler(rooturi, handler);
- return server;
+ void addHandler(const std::string& uri, CivetHandler* handler) {
+ handlers_.push_back(handler);
+ server_->addHandler(uri, handler);
+ }
-}
+ std::vector<int> getListeningPorts() {
+ return server_->getListeningPorts();
+ }
-static void stop_webserver(CivetServer *server) {
- if (server != nullptr)
- delete server;
+ ~TestServer() {
+ for (auto handler : handlers_) {
+ auto serverAwareHandler = dynamic_cast<ServerAwareHandler*>(handler);
+ if (serverAwareHandler) serverAwareHandler->stop();
+ }
- /* Un-initialize the library */
- mg_exit_library();
-}
+ }
+ private:
+ // server_ depends on lib_ (the library initializer)
+ // so their order matters
+ CivetLibrary lib_;
+ std::unique_ptr<CivetServer> server_;
+
+ std::vector<CivetHandler*> handlers_;
+};
#endif