You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/22 14:37:02 UTC
nifi-minifi-cpp git commit: MINIFICPP-623: Add trace capabilities to
controller and agent
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 8e258f545 -> 6a7f98904
MINIFICPP-623: Add trace capabilities to controller and agent
MINIFICPP-623: Change test port and controller sizes for OSX
MINIFICPP-623: avoid ifdef checks that may or may not exist on platforms -- use cmake check
This closes #424.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6a7f9890
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6a7f9890
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6a7f9890
Branch: refs/heads/master
Commit: 6a7f98904bc09b8b34eadfacaf4a0088f0d5bb22
Parents: 8e258f5
Author: Marc Parisi <ph...@apache.org>
Authored: Thu Oct 18 20:13:15 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Mon Oct 22 10:33:58 2018 -0400
----------------------------------------------------------------------
CMakeLists.txt | 8 +
OPS.md | 97 +++++++++++
README.md | 62 +------
controller/Controller.h | 60 +++++--
controller/MiNiFiController.cpp | 21 ++-
extensions/http-curl/tests/C2JstackTest.cpp | 164 ++++++++++++++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
libminifi/include/FlowController.h | 2 +
libminifi/include/SchedulingAgent.h | 13 +-
libminifi/include/core/state/UpdateController.h | 14 +-
libminifi/include/utils/BackTrace.h | 169 +++++++++++++++++++
libminifi/include/utils/ThreadPool.h | 47 +++++-
libminifi/src/FlowController.cpp | 9 +
libminifi/src/Properties.cpp | 6 +-
libminifi/src/c2/C2Agent.cpp | 58 +++++--
libminifi/src/c2/ControllerSocketProtocol.cpp | 16 ++
libminifi/src/utils/BackTrace.cpp | 132 +++++++++++++++
libminifi/test/unit/BackTraceTests.cpp | 116 +++++++++++++
libminifi/test/unit/ControllerTests.cpp | 4 +
19 files changed, 893 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4b0f7a3..1c92446 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -27,6 +27,7 @@ option(SKIP_TESTS "Skips building all tests." OFF)
option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON)
option(USE_SYSTEM_OPENSSL "Instructs the build system to search for and use an SSL library available in the host system" ON)
option(OPENSSL_OFF "Disables OpenSSL" OFF)
+option(ENABLE_OPS "Enable Operations Tools" ON)
option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF)
option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON)
if (WIN32)
@@ -39,6 +40,7 @@ option(USE_SYSTEM_BZIP2 "Instructs the build system to search for and use a bzip
option(BUILD_ROCKSDB "Instructs the build system to use RocksDB from the third party directory" ON)
option(FORCE_WINDOWS "Instructs the build system to force Windows builds when WIN32 is specified" OFF)
+include(CheckIncludeFile)
include(FeatureSummary)
include(ExternalProject)
@@ -73,6 +75,12 @@ if(CCACHE_FOUND)
message("-- Found ccache: ${CCACHE_FOUND}")
endif(CCACHE_FOUND)
+# check for exec info before we enable the backtrace features.
+CHECK_INCLUDE_FILE("execinfo.h" HAS_EXECINFO)
+if (ENABLE_OPS AND HAS_EXECINFO AND NOT WIN32)
+ add_definitions("-DHAS_EXECINFO=1")
+endif()
+
#### Establish Project Configuration ####
# Enable usage of the VERSION specifier
include(CheckCXXCompilerFlag)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/OPS.md
----------------------------------------------------------------------
diff --git a/OPS.md b/OPS.md
new file mode 100644
index 0000000..c993785
--- /dev/null
+++ b/OPS.md
@@ -0,0 +1,97 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Apache NiFi - MiNiFi - Operations Readme.
+
+
+This readme defines operational commands for managing instances.
+
+## Table of Contents
+
+- [Description](#description)
+- [Managing](#managing-minifi)
+ - [Commands](#commands)
+
+## Description
+
+Apache NiFi MiNiFi C++ can be managed through our [C2 protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal)
+or through a local interface called the MiNiFi Controller
+
+## Managing MiNiFi
+
+The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy.
+
+The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required.
+The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows
+you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only
+minificontroller on the local host can control the agent:
+
+ $ controller.socket.host=localhost
+ $ controller.socket.port=9998
+ $ controller.socket.local.any.interface=true/false (default: false)
+
+These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller
+will be disabled in your deployment.
+
+ The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below.
+ Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware.
+
+### Debug
+
+ Agents have the ability to return a list of stacks of currently running threads. The Jstack command provides a list of call stacks
+ for threads within the agent. This may allow users and maintainers to view stacks of running threads to diagnose issues. The name
+ is an homage to the jstack command used by Java developers. The design is fundamentally the same as that of Java -- signal handlers
+ notify signals to interrupt and provide traces. This feature is currently not built into Windows builds.
+
+### Commands
+ #### Specifying connecting information
+
+ ./minificontroller --host "host name" --port "port"
+
+ * By default these options use those defined in minifi.properties and are not required
+
+ #### Start Command
+
+ ./minificontroller --start "component name"
+
+ #### Stack command
+ ./minificontroller --jstack
+
+ #### Stop command
+ ./minificontroller --stop "component name"
+
+ #### List connections command
+ ./minificontroller --list connections
+
+ #### List components command
+ ./minificontroller --list components
+
+ #### Clear connection command
+ ./minificontroller --clear "connection name"
+
+ #### GetSize command
+ ./minificontroller --getsize "connection name"
+
+ * Returns the size of the connection. The current size along with the max will be reported
+
+ #### Update flow
+ ./minificontroller --updateflow "config yml"
+
+ * Updates the flow file reference and performs a warm re-deploy.
+
+ #### Get full connection command
+ ./minificontroller --getfull
+
+ * Provides a list of full connections, if any.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ead8dba..542dd6c 100644
--- a/README.md
+++ b/README.md
@@ -23,11 +23,12 @@ MiNiFi is a child project effort of Apache NiFi. This repository is for a nativ
- [Getting Started](#getting-started)
- [System Requirements](#system-requirements)
- [Bootstrapping](#bootstrapping)
- - [Building](#building)
- [Cleaning](#cleaning)
- [Configuring](#configuring)
- [Running](#running)
- [Deploying](#deploying)
+ - [Extensions](#extensions)
+- [Operations](#operations)
- [Issue Tracking](#issue-tracking)
- [Documentation](#documentation)
- [License](#license)
@@ -801,66 +802,13 @@ created within the build directory that contains a manifest of build artifacts.
The build identifier will be carried with the deployed binary for the configuration you specify. By default all extensions will be built.
-### Managing MiNiFi C++ through the MiNiFi Controller
-
-The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy.
-
-The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required.
-The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows
-you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only
-minificontroller on the local host can control the agent:
-
- $ controller.socket.host=localhost
- $ controller.socket.port=9998
- $ controller.socket.local.any.interface=true/false ( default false)
-
-These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller
-will be disabled in your deployment.
-
- The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below.
- Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware.
-
- #### Specifying connecting information
-
- ./minificontroller --host "host name" --port "port"
-
- * By default these options use those defined in minifi.properties and are not required
-
- #### Start Command
-
- ./minificontroller --start "component name"
-
- #### Stop command
- ./minificontroller --stop "component name"
-
- #### List connections command
- ./minificontroller --list connections
-
- #### List components command
- ./minificontroller --list components
-
- #### Clear connection command
- ./minificontroller --clear "connection name"
-
- #### GetSize command
- ./minificontroller --getsize "connection name"
-
- * Returns the size of the connection. The current size along with the max will be reported
-
- #### Update flow
- ./minificontroller --updateflow "config yml"
-
- *Updates the flow file reference and performs a warm re-deploy.
-
- #### Get full connection command
- ./minificontroller --getfull
-
- *Provides a list of full connections, if any.
-
### Extensions
Please see [Extensions.md](Extensions.md) on how to build and run conditionally built dependencies and extensions.
+## Operations
+See our [operations documentation for additional inforomation on how to manage instances](OPS.md)
+
## Issue Tracking
See https://issues.apache.org/jira/projects/MINIFICPP/issues for the issue tracker.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/controller/Controller.h
----------------------------------------------------------------------
diff --git a/controller/Controller.h b/controller/Controller.h
index 312b922..bbd099b 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -125,6 +125,40 @@ int getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream
return 0;
}
+int getJstacks(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) {
+ socket->initialize();
+ std::vector<uint8_t> data;
+ uint8_t op = minifi::c2::Operation::DESCRIBE;
+ minifi::io::BaseStream stream;
+ stream.writeData(&op, 1);
+ stream.writeUTF("jstack");
+ if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) {
+ return -1;
+ }
+ // read the response
+ uint8_t resp = 0;
+ socket->readData(&resp, 1);
+ if (resp == minifi::c2::Operation::DESCRIBE) {
+
+ uint64_t size = 0;
+ socket->read(size);
+
+ for (int i = 0; i < size; i++) {
+ std::string name;
+ uint64_t lines;
+ socket->readUTF(name);
+ socket->read(lines);
+ for (int j = 0; j < lines; j++) {
+ std::string line;
+ socket->readUTF(line);
+ out << name << " -- " << line << std::endl;
+ }
+
+ }
+ }
+ return 0;
+}
+
/**
* Prints the connection size for the provided connection.
* @param socket socket ptr
@@ -168,7 +202,7 @@ int listComponents(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out
out << "Components:" << std::endl;
for (int i = 0; i < responses; i++) {
- std::string name,status;
+ std::string name, status;
socket->readUTF(name, false);
socket->readUTF(status, false);
out << name << ", running: " << status << std::endl;
@@ -244,7 +278,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const
return service;
}
- void printManifest(const std::shared_ptr<minifi::Configure> &configuration) {
+void printManifest(const std::shared_ptr<minifi::Configure> &configuration) {
std::string prov_repo_class = "volatileprovenancerepository";
std::string flow_repo_class = "volatileflowfilerepository";
@@ -252,12 +286,12 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const
std::string content_repo_class = "volatilecontentrepository";
std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>();
- log_properties->setHome("./");
- log_properties->set("appender.stdout","stdout");
- log_properties->set("logger.org::apache::nifi::minifi","OFF,stdout");
- logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
+ log_properties->setHome("./");
+ log_properties->set("appender.stdout", "stdout");
+ log_properties->set("logger.org::apache::nifi::minifi", "OFF,stdout");
+ logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
- configuration->set(minifi::Configure::nifi_flow_configuration_file,"../conf/config.yml");
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, "../conf/config.yml");
configuration->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class);
// Create repos for flow record and provenance
std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance");
@@ -280,11 +314,11 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const
minifi::setDefaultDirectory(content_repo_path);
}
- configuration->set("c2.agent.heartbeat.period","25");
- configuration->set("nifi.c2.root.classes","AgentInformation");
- configuration->set("nifi.c2.enable","true");
- configuration->set("c2.agent.listen","true");
- configuration->set("c2.agent.heartbeat.reporter.classes","AgentPrinter");
+ configuration->set("c2.agent.heartbeat.period", "25");
+ configuration->set("nifi.c2.root.classes", "AgentInformation");
+ configuration->set("nifi.c2.enable", "true");
+ configuration->set("c2.agent.listen", "true");
+ configuration->set("c2.agent.heartbeat.reporter.classes", "AgentPrinter");
configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name);
@@ -293,7 +327,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const
std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name);
std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>(
- new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo,"manifest",false));
+ new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo, "manifest", false));
controller->load();
controller->start();
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/controller/MiNiFiController.cpp
----------------------------------------------------------------------
diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
index b385685..06bba2c 100644
--- a/controller/MiNiFiController.cpp
+++ b/controller/MiNiFiController.cpp
@@ -129,6 +129,7 @@ int main(int argc, char **argv) {
("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>()) //NOLINT
("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>()) //NOLINT
("getfull", "Reports a list of full connections") //NOLINT
+ ("jstack", "Returns backtraces from the agent") //NOLINT
("manifest", "Generates a manifest for the current binary") //NOLINT
("noheaders", "Removes headers from output streams");
@@ -191,13 +192,12 @@ int main(int argc, char **argv) {
auto& components = result["c"].as<std::vector<std::string>>();
for (const auto& connection : components) {
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
- if (clearConnection(std::move(socket), connection)){
+ if (clearConnection(std::move(socket), connection)) {
std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl;
socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
if (getConnectionSize(std::move(socket), std::cout, connection) < 0)
- std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
- }
- else
+ std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+ } else
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
}
@@ -231,6 +231,12 @@ int main(int argc, char **argv) {
std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
}
+ if (result.count("jstack") > 0) {
+ auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
+ if (getJstacks(std::move(socket), std::cout) < 0)
+ std::cout << "Could not connect to remote host " << host << ":" << port << std::endl;
+ }
+
if (result.count("updateflow") > 0) {
auto& flow_file = result["updateflow"].as<std::string>();
auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port);
@@ -241,10 +247,9 @@ int main(int argc, char **argv) {
if (result.count("manifest") > 0) {
printManifest(configuration);
}
- }catch (const std::exception &exc)
- {
- // catch anything thrown within try block that derives from std::exception
- std::cerr << exc.what() << std::endl;
+ } catch (const std::exception &exc) {
+ // catch anything thrown within try block that derives from std::exception
+ std::cerr << exc.what() << std::endl;
} catch (...) {
std::cout << options.help( { "", "Group" }) << std::endl;
exit(0);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/extensions/http-curl/tests/C2JstackTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
new file mode 100644
index 0000000..4141312
--- /dev/null
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -0,0 +1,164 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+
+void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+}
+
+
+class ConfigHandler : public CivetHandler {
+ public:
+ ConfigHandler() {
+ calls_ = 0;
+ }
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ calls_++;
+ std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"describe\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"jstack\""
+ "}]}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ heartbeat_response.length());
+ mg_printf(conn, "%s", heartbeat_response.c_str());
+
+
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ std::ifstream myfile(test_file_location_.c_str());
+
+ if (myfile.is_open()) {
+ std::stringstream buffer;
+ buffer << myfile.rdbuf();
+ std::string str = buffer.str();
+ myfile.close();
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
+ mg_printf(conn, "%s", str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+ std::string test_file_location_;
+ std::atomic<size_t> calls_;
+};
+
+int main(int argc, char **argv) {
+ mg_init_library(0);
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+
+ const char *options[] = { "document_root", ".", "listening_ports", "8727", 0 };
+ std::vector<std::string> cpp_options;
+ for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
+ cpp_options.push_back(options[i]);
+ }
+
+ CivetServer server(cpp_options);
+ ConfigHandler h_ex;
+ server.addHandler("/update", h_ex);
+ std::string key_dir, test_file_location;
+ if (argc > 1) {
+ h_ex.test_file_location_ = test_file_location = argv[1];
+ key_dir = argv[2];
+ }
+
+
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+
+ configuration->set("c2.rest.url", "http://localhost:8727/update");
+ configuration->set("c2.agent.heartbeat.period", "1000");
+ mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+ true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+
+ std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
+ ptr.release();
+ auto start = std::chrono::system_clock::now();
+
+ controller->load();
+ controller->start();
+ waitToVerifyProcessor();
+
+ controller->waitUnload(60000);
+ auto then = std::chrono::system_clock::now();
+
+ auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
+ std::string logs = LogTestController::getInstance().log_output.str();
+ #ifndef WIN32
+ assert(logs.find("SchedulingAgent") != std::string::npos);
+ #endif
+ LogTestController::getInstance().reset();
+ rmdir("./content_repository");
+ assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/extensions/http-curl/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 721c221..b8d6c69 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -71,6 +71,7 @@ message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test fi
add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/")
add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 0466546..9309b4f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -319,6 +319,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
virtual uint64_t getUptime();
+ virtual std::vector<BackTrace> getTraces();
+
void initializeC2();
protected:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 682f6ec..925efdb 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -29,6 +29,7 @@
#include <thread>
#include "utils/TimeUtil.h"
#include "utils/ThreadPool.h"
+#include "utils/BackTrace.h"
#include "core/Core.h"
#include "core/logging/LoggerConfiguration.h"
#include "properties/Configure.h"
@@ -90,7 +91,7 @@ class SingleRunMonitor : public TimerAwareMonitor {
: TimerAwareMonitor(run_monitor) {
}
explicit SingleRunMonitor(TimerAwareMonitor &&other)
- : TimerAwareMonitor(std::move(other)){
+ : TimerAwareMonitor(std::move(other)) {
}
virtual bool isFinished(const uint64_t &result) {
if (result == 0) {
@@ -123,7 +124,11 @@ class SchedulingAgent {
running_ = false;
repo_ = repo;
flow_repo_ = flow_repo;
- auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true, controller_service_provider);
+ /**
+ * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path
+ * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases.
+ */
+ auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), false, controller_service_provider, "SchedulingAgent");
thread_pool_ = std::move(pool);
thread_pool_.start();
}
@@ -148,6 +153,10 @@ class SchedulingAgent {
thread_pool_.shutdown();
}
+ std::vector<BackTrace> getTraces() {
+ return thread_pool_.getTraces();
+ }
+
public:
virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/core/state/UpdateController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index fec3cc1..7cd61c4 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -20,6 +20,7 @@
#include <string>
#include "utils/ThreadPool.h"
+#include "utils/BackTrace.h"
namespace org {
namespace apache {
@@ -69,9 +70,9 @@ class UpdateStatus {
class Update {
public:
- Update()
- : status_(UpdateStatus(UpdateState::INITIATE, 0)) {
- }
+ Update()
+ : status_(UpdateStatus(UpdateState::INITIATE, 0)) {
+ }
Update(UpdateStatus status)
: status_(status) {
@@ -235,6 +236,13 @@ class StateMonitor : public StateController {
*/
virtual uint64_t getUptime() = 0;
+ /**
+ * Returns a vector of backtraces
+ * @return backtraces from the state monitor.
+ */
+ virtual std::vector<BackTrace> getTraces() = 0;
+
+
protected:
std::atomic<bool> controller_running_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/utils/BackTrace.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/BackTrace.h b/libminifi/include/utils/BackTrace.h
new file mode 100644
index 0000000..5c7bb80
--- /dev/null
+++ b/libminifi/include/utils/BackTrace.h
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_
+#define LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_
+
+#ifdef HAS_EXECINFO
+#include <execinfo.h>
+#include <signal.h>
+#endif
+#include <thread>
+#include <vector>
+#include <mutex>
+#include <iostream>
+#include <sstream>
+
+#define TRACE_BUFFER_SIZE 128
+
+/**
+ * Forward declaration allows us to tightly couple TraceResolver
+ * with BackTrace.
+ */
+class TraceResolver;
+
+/**
+ * Purpose: Backtrace is a movable vector of trace lines.
+ *
+ */
+class BackTrace {
+ public:
+ BackTrace() {
+ }
+ BackTrace(const std::string &name)
+ : name_(name) {
+ }
+ BackTrace(BackTrace &&) = default;
+ BackTrace(BackTrace &) = delete;
+
+ std::vector<std::string> getTraces() const {
+ return trace_;
+ }
+
+ BackTrace &operator=(BackTrace &&other) = default;
+
+ /**
+ * Return thread name of f this caller
+ * @returns name ;
+ */
+ std::string getName() const {
+ return name_;
+ }
+
+ protected:
+ void addLine(const std::string &symbol_line) {
+ trace_.emplace_back(symbol_line);
+ }
+
+ private:
+ std::string name_;
+ std::vector<std::string> trace_;
+ friend class TraceResolver;
+};
+
+/**
+ * Pulls the trace and places it onto the TraceResolver instance.
+ */
+void pull_trace(const uint8_t frames_to_skip = 1);
+
+#ifdef HAS_EXECINFO
+/**
+ * Signal handler that will run via TraceResolver
+ */
+void handler(int signr, siginfo_t *info, void *secret);
+#endif
+/**
+ * Emplaces a signal handler for SIGUSR2
+ */
+void emplace_handler();
+
+/**
+ * Purpose: Provides a singular instance to grab the call stack for thread(s).
+ * Design: is a singleton to avoid multiple signal handlers.
+ */
+class TraceResolver {
+ public:
+
+ /**
+ * Retrieves the backtrace for the provided thread reference
+ * @return BackTrace instance
+ */
+ BackTrace &&getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread);
+
+ /**
+ * Retrieves the backtrace for the calling thread
+ * @returns BackTrace instance
+ */
+ BackTrace &&getBackTrace(const std::string &thread_name) {
+#ifdef WIN32
+ // currrently not supported in windows
+ return BackTrace(thread_name);
+#else
+ return std::move(getBackTrace(thread_name, pthread_self()));
+#endif
+ }
+
+ /**
+ * Returns a static instance of the thread resolver.
+ */
+ static TraceResolver &getResolver() {
+ static TraceResolver resolver;
+ return resolver;
+ }
+
+ /**
+ * Adds a trace line with an optional function
+ * @param symbol_line symbol line that was produced
+ * @param func function name
+ */
+ void addTraceLine(const char *symbol_line, const char *func = nullptr) {
+ std::stringstream line;
+ line << symbol_line;
+ if (nullptr != func) {
+ line << " @" << func;
+ }
+ trace_.addLine(line.str());
+ }
+
+ /**
+ * Returns the thread handle reference in the native format.
+ */
+ const std::thread::native_handle_type getThreadHandle() {
+ return thread_handle_;
+ }
+
+ /**
+ * Returns the caller handle refernce in the native format.
+ */
+ const std::thread::native_handle_type getCallerHandle() {
+ return caller_handle_;
+ }
+
+ private:
+ TraceResolver() // can't use = default due to handle_types not defaulting.
+ : thread_handle_(0),
+ caller_handle_(0) {
+ ;
+ }
+
+ BackTrace trace_;
+ std::thread::native_handle_type thread_handle_;
+ std::thread::native_handle_type caller_handle_;
+ std::mutex mutex_;
+};
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ */
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 9fc47f5..ffb28fe 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -18,6 +18,7 @@
#define LIBMINIFI_INCLUDE_THREAD_POOL_H
#include <chrono>
+#include <sstream>
#include <iostream>
#include <atomic>
#include <mutex>
@@ -28,6 +29,7 @@
#include <thread>
#include <functional>
+#include "BackTrace.h"
#include "capi/expect.h"
#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
@@ -189,17 +191,20 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
class WorkerThread {
public:
- explicit WorkerThread(std::thread thread)
+ explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker")
: is_running_(false),
- thread_(std::move(thread)) {
+ thread_(std::move(thread)),
+ name_(name) {
}
- WorkerThread()
- : is_running_(false) {
+ WorkerThread(const std::string &name = "NamelessWorker")
+ : is_running_(false),
+ name_(name) {
}
std::atomic<bool> is_running_;
std::thread thread_;
+ std::string name_;
};
/**
@@ -212,13 +217,15 @@ template<typename T>
class ThreadPool {
public:
- ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr)
+ ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr,
+ const std::string &name = "NamelessPool")
: daemon_threads_(daemon_threads),
thread_reduction_count_(0),
max_worker_threads_(max_worker_threads),
adjust_threads_(false),
running_(false),
- controller_service_provider_(controller_service_provider) {
+ controller_service_provider_(controller_service_provider),
+ name_(name) {
current_workers_ = 0;
task_count_ = 0;
thread_manager_ = nullptr;
@@ -231,7 +238,8 @@ class ThreadPool {
adjust_threads_(false),
running_(false),
controller_service_provider_(std::move(other.controller_service_provider_)),
- thread_manager_(std::move(other.thread_manager_)) {
+ thread_manager_(std::move(other.thread_manager_)),
+ name_(std::move(other.name_)) {
current_workers_ = 0;
task_count_ = 0;
}
@@ -264,6 +272,22 @@ class ThreadPool {
return task_status_[identifier] == true;
}
+ std::vector<BackTrace> getTraces() {
+ std::vector<BackTrace> traces;
+ std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+ std::unique_lock<std::mutex> wlock(worker_queue_mutex_);
+ // while we may be checking if running, we don't want to
+ // use the threads outside of the manager mutex's lock -- therefore we will
+ // obtain a lock so we can keep the threads in memory
+ if (running_) {
+ for (const auto &worker : thread_queue_) {
+ if (worker->is_running_)
+ traces.emplace_back(TraceResolver::getResolver().getBackTrace(worker->name_, worker->thread_.native_handle()));
+ }
+ }
+ return traces;
+ }
+
/**
* Starts the Thread Pool
*/
@@ -315,6 +339,8 @@ class ThreadPool {
if (!running_) {
start();
}
+
+ name_ = other.name_;
return *this;
}
@@ -367,6 +393,8 @@ class ThreadPool {
std::recursive_mutex manager_mutex_;
// work queue mutex
std::mutex worker_queue_mutex_;
+ // thread pool name
+ std::string name_;
/**
* Call for the manager to start worker threads
@@ -404,7 +432,9 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
template<typename T>
void ThreadPool<T>::manageWorkers() {
for (int i = 0; i < max_worker_threads_; i++) {
- auto worker_thread = std::make_shared<WorkerThread>();
+ std::stringstream thread_name;
+ thread_name << name_ << " #" << i;
+ auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
thread_queue_.push_back(worker_thread);
current_workers_++;
@@ -461,6 +491,7 @@ void ThreadPool<T>::manageWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ thread->is_running_ = true;
uint64_t wait_decay_ = 0;
uint64_t yield_backoff = 10; // start at 10 ms
while (running_.load()) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 9206f41..25b4fc3 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -910,6 +910,15 @@ uint64_t FlowController::getUptime() {
return time_since;
}
+std::vector<BackTrace> FlowController::getTraces() {
+ std::vector<BackTrace> traces;
+ auto timer_driven = timer_scheduler_->getTraces();
+ traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end()));
+ auto event_driven = event_scheduler_->getTraces();
+ traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end()));
+ return traces;
+}
+
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index e64a92f..c8cb341 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -26,7 +26,7 @@ namespace apache {
namespace nifi {
namespace minifi {
-#define BUFFER_SIZE 512
+#define TRACE_BUFFER_SIZE 512
Properties::Properties()
: logger_(logging::LoggerFactory<Properties>::getLogger()) {
@@ -138,8 +138,8 @@ void Properties::loadConfigureFile(const char *fileName) {
}
this->clear();
- char buf[BUFFER_SIZE];
- for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
+ char buf[TRACE_BUFFER_SIZE];
+ for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) {
parseConfigureFileLine(buf);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index ebb4549..8168c77 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -173,21 +173,25 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
if (allow_updates_) {
if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) {
char cwd[1024];
- getcwd(cwd, sizeof(cwd));
+ if (getcwd(cwd, sizeof(cwd)) == nullptr) {
+ logger_->log_error("Could not set update command, reason %s", std::strerror(errno));
- std::stringstream command;
- command << cwd << "/minifi.sh update";
- update_command_ = command.str();
+ } else {
+ std::stringstream command;
+ command << cwd << "/minifi.sh update";
+ update_command_ = command.str();
+ }
}
if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) {
char cwd[1024];
- getcwd(cwd, sizeof(cwd));
-
- std::stringstream copy_path;
- std::stringstream command;
-
- copy_path << cwd << "/minifi.update";
+ if (getcwd(cwd, sizeof(cwd)) == nullptr) {
+ logger_->log_error("Could not set copy path, reason %s", std::strerror(errno));
+ } else {
+ std::stringstream copy_path;
+ std::stringstream command;
+ copy_path << cwd << "/minifi.update";
+ }
}
// if not defined we won't beable to update
@@ -536,6 +540,31 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
enqueue_c2_response(std::move(response));
return;
+ } else if (resp.name == "jstack") {
+ if (update_sink_->isRunning()) {
+ const std::vector<BackTrace> traces = update_sink_->getTraces();
+ for (const auto &trace : traces) {
+ for (const auto & line : trace.getTraces()) {
+ logger_->log_trace("%s -- %s", trace.getName(), line);
+ }
+ }
+ auto keys = configuration_->getConfiguredKeys();
+ C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ response.setLabel("configuration_options");
+ for (const auto &trace : traces) {
+ C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ options.setLabel(trace.getName());
+ std::string value;
+ for (const auto &line : trace.getTraces()) {
+ C2ContentResponse option(Operation::ACKNOWLEDGE);
+ option.name = line;
+ option.operation_arguments[line] = line;
+ options.addContent(std::move(option));
+ }
+ response.addPayload(std::move(options));
+ }
+ enqueue_c2_response(std::move(response));
+ }
}
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
enqueue_c2_response(std::move(response));
@@ -720,14 +749,19 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
void C2Agent::restart_agent() {
char cwd[1024];
- getcwd(cwd, sizeof(cwd));
+ if (getcwd(cwd, sizeof(cwd)) == nullptr) {
+ logger_->log_error("Could not restart agent, reason %s", std::strerror(errno));
+ return;
+ }
std::stringstream command;
command << cwd << "/minifi.sh restart";
}
void C2Agent::update_agent() {
- system(update_command_.c_str());
+ if (!system(update_command_.c_str())) {
+ logger_->log_warn("May not have command processor");
+ }
}
int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/c2/ControllerSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 2bd6d4d..d4f3970 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -187,6 +187,22 @@ void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller
resp.writeUTF(component->isRunning() ? "true" : "false");
}
stream->writeData(const_cast<uint8_t*>(resp.getBuffer()), resp.getSize());
+ } else if (what == "jstack") {
+ io::BaseStream resp;
+ resp.writeData(&head, 1);
+ auto traces = update_sink_->getTraces();
+ uint64_t trace_size = traces.size();
+ resp.write(trace_size);
+ for (const auto &trace : traces) {
+ const auto &lines = trace.getTraces();
+ resp.writeUTF(trace.getName());
+ uint64_t lsize = lines.size();
+ resp.write(lsize);
+ for (const auto &line : lines) {
+ resp.writeUTF(line);
+ }
+ }
+ stream->writeData(const_cast<uint8_t*>(resp.getBuffer()), resp.getSize());
} else if (what == "connections") {
io::BaseStream resp;
resp.writeData(&head, 1);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/utils/BackTrace.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/BackTrace.cpp b/libminifi/src/utils/BackTrace.cpp
new file mode 100644
index 0000000..160a070
--- /dev/null
+++ b/libminifi/src/utils/BackTrace.cpp
@@ -0,0 +1,132 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/BackTrace.h"
+#ifdef HAS_EXECINFO
+#include <execinfo.h>
+#include <iostream>
+#include <cxxabi.h>
+#endif
+#define NAME_SIZE 256
+
+void pull_trace(const uint8_t frames_to_skip) {
+#ifdef HAS_EXECINFO
+ void *stackBuffer[TRACE_BUFFER_SIZE + 1];
+
+ // retrieve current stack addresses
+ int trace_size = backtrace(stackBuffer, TRACE_BUFFER_SIZE);
+
+ char **symboltable = backtrace_symbols(stackBuffer, trace_size);
+ /**
+ * we can skip the signal handler, call to pull_trace, and the first entry for backtrace_symbols
+ */
+ for (int i = frames_to_skip; i < trace_size; i++) {
+ char *start_parenthetical = 0;
+ char *functor = 0;
+ char *stop_parenthetical = 0;
+
+ for (char *p = symboltable[i]; *p; ++p) {
+ if (*p == '(') {
+ start_parenthetical = p;
+ } else if (*p == '+') {
+ functor = p;
+ } else if (*p == ')' && functor) {
+ stop_parenthetical = p;
+ break;
+ }
+ }
+ bool hasFunc = start_parenthetical && functor && stop_parenthetical;
+ if (hasFunc && start_parenthetical < functor) {
+ *start_parenthetical++ = '\0';
+ *functor++ = '\0';
+ *stop_parenthetical = '\0';
+
+ /**
+ * Demangle the names -- this requires calling cxx api to demangle the function name.
+ * not sending an allocated buffer, so we'll deallocate if status is zero.
+ */
+
+ int status;
+
+ auto demangled = abi::__cxa_demangle(start_parenthetical, nullptr, nullptr, &status);
+ if (status == 0) {
+ TraceResolver::getResolver().addTraceLine(symboltable[i], demangled);
+ free(demangled);
+ } else {
+ TraceResolver::getResolver().addTraceLine(symboltable[i], start_parenthetical);
+ }
+ } else {
+ TraceResolver::getResolver().addTraceLine(symboltable[i], "");
+ }
+ }
+
+ free(symboltable);
+#endif
+}
+
+BackTrace &&TraceResolver::getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread_handle) {
+ // lock so that we only perform one backtrace at a time.
+#ifdef HAS_EXECINFO
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ caller_handle_ = pthread_self();
+ thread_handle_ = thread_handle;
+ trace_ = BackTrace(thread_name);
+
+ if (0 == thread_handle_ || pthread_equal(caller_handle_, thread_handle)) {
+ pull_trace();
+ } else {
+ if (thread_handle_ == 0) {
+ return std::move(trace_);
+ }
+ emplace_handler();
+ if (pthread_kill(thread_handle_, SIGUSR2) != 0) {
+ return std::move(trace_);
+ }
+ sigset_t mask;
+ sigfillset(&mask);
+ sigdelset(&mask, SIGUSR2);
+ sigsuspend(&mask);
+ }
+#else
+ // even if tracing is disabled, include thread name into the trace object
+ trace_ = BackTrace(thread_name);
+#endif
+ return std::move(trace_);
+}
+#ifdef HAS_EXECINFO
+void handler(int signr, siginfo_t *info, void *secret) {
+ auto curThread = pthread_self();
+
+ // not the intended thread
+ if (!pthread_equal(curThread, TraceResolver::getResolver().getThreadHandle())) {
+ return;
+ }
+
+ pull_trace();
+
+ pthread_kill(TraceResolver::getResolver().getCallerHandle(), SIGUSR2);
+}
+#endif
+
+void emplace_handler() {
+#ifdef HAS_EXECINFO
+ struct sigaction sa;
+ sigfillset(&sa.sa_mask);
+ sa.sa_flags = SA_SIGINFO;
+ sa.sa_sigaction = handler;
+ sigaction(SIGUSR2, &sa, NULL);
+#endif
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/test/unit/BackTraceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
new file mode 100644
index 0000000..816ff63
--- /dev/null
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <utility>
+#include <future>
+#include <memory>
+#include "../TestBase.h"
+#include "utils/BackTrace.h"
+
+bool function() {
+ return true;
+}
+
+class WorkerNumberExecutions : public utils::AfterExecute<int> {
+ public:
+ explicit WorkerNumberExecutions(int tasks)
+ : runs(0),
+ tasks(tasks) {
+ }
+
+ explicit WorkerNumberExecutions(WorkerNumberExecutions && other)
+ : runs(std::move(other.runs)),
+ tasks(std::move(other.tasks)) {
+ }
+
+ ~WorkerNumberExecutions() {
+ }
+
+ virtual bool isFinished(const int &result) {
+ if (result > 0 && ++runs < tasks) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ virtual bool isCancelled(const int &result) {
+ return false;
+ }
+
+ int getRuns() {
+ return runs;
+ }
+
+ virtual int64_t wait_time() {
+ // wait 50ms
+ return 50;
+ }
+
+ protected:
+ int runs;
+ int tasks;
+};
+
+TEST_CASE("BT1", "[TPT1]") {
+ const BackTrace trace = TraceResolver::getResolver().getBackTrace("BT1");
+#ifdef HAS_EXECINFO
+ REQUIRE(!trace.getTraces().empty());
+#endif
+}
+
+std::atomic<int> counter;
+
+int counterFunction() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ return ++counter;
+}
+
+TEST_CASE("BT2", "[TPT2]") {
+ counter = 0;
+ utils::ThreadPool<int> pool(4);
+ pool.start();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ for (int i = 0; i < 3; i++) {
+ std::function<int()> f_ex = counterFunction;
+ std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
+ utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
+
+ std::future<int> fut;
+ REQUIRE(true == pool.execute(std::move(functor), fut));
+ }
+
+ std::function<int()> f_ex = counterFunction;
+ std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
+ utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
+
+ std::future<int> fut;
+ REQUIRE(true == pool.execute(std::move(functor), fut));
+
+ std::vector<BackTrace> traces = pool.getTraces();
+ for (const auto &trace : traces) {
+ const auto &trace_strings = trace.getTraces();
+#ifdef HAS_EXECINFO
+ REQUIRE(trace_strings.size() > 2);
+ if (trace_strings.at(0).find("sleep_for") != std::string::npos) {
+ REQUIRE(trace_strings.at(1).find("counterFunction") != std::string::npos);
+ }
+#endif
+ }
+ fut.wait();
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/test/unit/ControllerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index c5268ab..0e75566 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -110,6 +110,10 @@ class TestUpdateSink : public minifi::state::StateMonitor {
virtual int16_t pause() {
return 0;
}
+ virtual std::vector<BackTrace> getTraces() {
+ std::vector<BackTrace> traces;
+ return traces;
+ }
/**
* Operational controllers