You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/10/01 10:47:07 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1332 Prevent
errneous behavior by stopping FlowController in low disk space conditions
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b0be47f MINIFICPP-1332 Prevent errneous behavior by stopping FlowController in low disk space conditions
b0be47f is described below
commit b0be47f354ab092f368cfce4ec4723fba3bcf4ea
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Aug 19 13:00:46 2020 +0200
MINIFICPP-1332 Prevent errneous behavior by stopping FlowController in low disk space conditions
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #875
---
conf/minifi.properties | 9 +
.../standard-processors/processors/TailFile.cpp | 10 +-
.../tests/unit/RetryFlowFileTests.cpp | 2 +-
.../tests/unit/TailFileTests.cpp | 8 +-
.../{utils/CallBackTimer.h => DiskSpaceWatchdog.h} | 57 +-
libminifi/include/core/Repository.h | 4 +
libminifi/include/properties/Configure.h | 98 +--
libminifi/include/properties/Properties.h | 12 +
libminifi/include/utils/CallBackTimer.h | 1 -
libminifi/include/utils/GeneralUtils.h | 69 ++
.../utils/{CallBackTimer.h => IntervalSwitch.h} | 61 +-
libminifi/include/utils/OptionalUtils.h | 50 ++
libminifi/include/utils/file/FileUtils.h | 896 ++++++++++-----------
libminifi/include/utils/file/PathUtils.h | 58 +-
libminifi/src/Configure.cpp | 94 +--
libminifi/src/DiskSpaceWatchdog.cpp | 87 ++
libminifi/src/FlowController.cpp | 36 +-
libminifi/src/Properties.cpp | 2 +-
libminifi/src/utils/file/FileUtils.cpp | 2 +-
libminifi/src/utils/file/PathUtils.cpp | 53 +-
libminifi/test/unit/DiskSpaceWatchdogTests.cpp | 61 ++
libminifi/test/unit/EnvironmentUtilsTests.cpp | 2 +-
libminifi/test/unit/FileUtilsTests.cpp | 28 +-
libminifi/test/unit/GeneralUtilsTest.cpp | 117 +++
libminifi/test/unit/IntervalSwitchTest.cpp | 66 ++
libminifi/test/unit/OptionalTest.cpp | 47 ++
libminifi/test/unit/PathUtilsTests.cpp | 36 +-
main/MiNiFiMain.cpp | 60 +-
28 files changed, 1365 insertions(+), 661 deletions(-)
diff --git a/conf/minifi.properties b/conf/minifi.properties
index dfa980f..291d1b2 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -49,6 +49,15 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
## To change the frequency at which the default state storage is persisted, modify the following
#nifi.state.management.provider.local.auto.persistence.interval=1 min
+# Disk space watchdog #
+## Stops MiNiFi FlowController activity (excluding C2), when the available disk space on either of the repository
+## volumes go below stop.threshold, checked every interval, then restarts when the available space on all
+## repository volumes reach at least restart.threshold.
+#minifi.disk.space.watchdog.enable=true
+#minifi.disk.space.watchdog.interval=15 sec
+#minifi.disk.space.watchdog.stop.threshold=100 MB
+#minifi.disk.space.watchdog.restart.threshold=150 MB
+
## Enabling C2 Uncomment each of the following options
## define those with missing options
#nifi.c2.enable=true
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 35a2ce9..46bd952 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -381,7 +381,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
tail_mode_ = Mode::SINGLE;
std::string path, file_name;
- if (utils::file::PathUtils::getFileNameAndPath(file_to_tail_, path, file_name)) {
+ if (utils::file::getFileNameAndPath(file_to_tail_, path, file_name)) {
// NOTE: position and checksum will be updated in recoverState() if there is a persisted state for this file
tail_states_.emplace(file_to_tail_, TailState{path, file_name});
} else {
@@ -393,7 +393,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
std::string rolling_filename_pattern_glob;
context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
- rolling_filename_pattern_ = utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
+ rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
}
void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
@@ -432,7 +432,7 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &s
if (key == "FILENAME") {
std::string fileLocation, fileName;
- if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+ if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
state.emplace(fileName, TailState{fileLocation, fileName});
} else {
@@ -451,7 +451,7 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &s
if (key.find(CURRENT_STR) == 0) {
const auto file = key.substr(strlen(CURRENT_STR));
std::string fileLocation, fileName;
- if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+ if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
state[file].path_ = fileLocation;
state[file].file_name_ = fileName;
} else {
@@ -513,7 +513,7 @@ bool TailFile::getStateFromStateManager(std::map<std::string, TailState> &new_ta
}};
std::string fileLocation, fileName;
- if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) {
+ if (utils::file::getFileNameAndPath(current, fileLocation, fileName)) {
logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
new_tail_states.emplace(current, TailState{fileLocation, fileName, position, last_read_time, checksum});
} else {
diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
index 133618f..0e6913a 100644
--- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
@@ -37,7 +37,7 @@
namespace {
using org::apache::nifi::minifi::utils::createTempDir;
using org::apache::nifi::minifi::utils::optional;
-using org::apache::nifi::minifi::utils::file::FileUtils;
+namespace FileUtils = org::apache::nifi::minifi::utils::file;
class RetryFlowFileTest {
public:
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 5706fbc..2c57c7d 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -239,7 +239,7 @@ TEST_CASE("TailFile picks up the state correctly if it is rewritten between runs
REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
std::string filePath, fileName;
- REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file.str(), filePath, fileName));
+ REQUIRE(utils::file::getFileNameAndPath(temp_file.str(), filePath, fileName));
// should stay the same
for (int i = 0; i < 5; i++) {
@@ -321,7 +321,7 @@ TEST_CASE("TailFile converts the old-style state file to the new-style state", "
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
std::string filePath, fileName;
- REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file, filePath, fileName));
+ REQUIRE(utils::file::getFileNameAndPath(temp_file, filePath, fileName));
std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName},
{"file.0.position", "35"},
{"file.0.current", temp_file},
@@ -365,8 +365,8 @@ TEST_CASE("TailFile converts the old-style state file to the new-style state", "
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
std::string filePath1, filePath2, fileName1, fileName2;
- REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_1, filePath1, fileName1));
- REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_2, filePath2, fileName2));
+ REQUIRE(utils::file::getFileNameAndPath(temp_file_1, filePath1, fileName1));
+ REQUIRE(utils::file::getFileNameAndPath(temp_file_2, filePath2, fileName2));
std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName1},
{"file.0.position", "35"},
{"file.0.current", temp_file_1},
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/DiskSpaceWatchdog.h
similarity index 52%
copy from libminifi/include/utils/CallBackTimer.h
copy to libminifi/include/DiskSpaceWatchdog.h
index faf25b3..384a9fe 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/DiskSpaceWatchdog.h
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,49 +16,45 @@
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-#define LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
+#pragma once
-#include <mutex>
-#include <condition_variable>
-#include <thread>
#include <chrono>
-#include <functional>
+#include <cinttypes>
+#include <string>
+#include <vector>
+
+#include "utils/IntervalSwitch.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-namespace utils {
-
-class CallBackTimer {
- public:
- CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
- ~CallBackTimer();
-
- void stop();
- void start();
+class Configure;
+namespace core {
+namespace logging {
+class Logger;
+} // namespace logging
+} // namespace core
+
+namespace disk_space_watchdog {
+struct Config {
+ std::chrono::milliseconds interval;
+ std::uintmax_t stop_threshold_bytes;
+ std::uintmax_t restart_threshold_bytes;
+};
- bool is_running() const;
+Config read_config(const Configure&);
- private:
- bool execute_;
- std::function<void(void)> func_;
- std::thread thd_;
- mutable std::mutex mtx_;
- mutable std::mutex cv_mtx_;
- std::condition_variable cv_;
+inline utils::IntervalSwitch<std::uintmax_t> disk_space_interval_switch(Config config) {
+ return {config.stop_threshold_bytes, config.restart_threshold_bytes, utils::IntervalSwitchState::UPPER};
+}
- const std::chrono::milliseconds interval_;
-};
+// Esentially `paths | transform(utils::file::space) | transform(&utils::file::space_info::available)` with error logging
+std::vector<std::uintmax_t> check_available_space(const std::vector<std::string>& paths, core::logging::Logger* logger = nullptr);
-} // namespace utils
+} // namespace disk_space_watchdog
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index 2b7d7aa..194937c 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -225,6 +225,10 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
virtual uint64_t getRepoSize();
+ std::string getDirectory() const {
+ return directory_;
+ }
+
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Repository(const Repository &parent) = delete;
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 7a298f2..e1ad1f5 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -53,59 +53,63 @@ class Configure : public Properties {
}
// nifi.flow.configuration.file
- static const char *nifi_default_directory;
- static const char *nifi_flow_configuration_file;
- static const char *nifi_flow_configuration_file_exit_failure;
- static const char *nifi_flow_configuration_file_backup_update;
- static const char *nifi_flow_engine_threads;
- static const char *nifi_flow_engine_alert_period;
- static const char *nifi_flow_engine_event_driven_time_slice;
- static const char *nifi_administrative_yield_duration;
- static const char *nifi_bored_yield_duration;
- static const char *nifi_graceful_shutdown_seconds;
- static const char *nifi_flowcontroller_drain_timeout;
- static const char *nifi_log_level;
- static const char *nifi_server_name;
- static const char *nifi_configuration_class_name;
- static const char *nifi_flow_repository_class_name;
- static const char *nifi_content_repository_class_name;
- static const char *nifi_volatile_repository_options;
- static const char *nifi_provenance_repository_class_name;
- static const char *nifi_server_port;
- static const char *nifi_server_report_interval;
- static const char *nifi_provenance_repository_max_storage_time;
- static const char *nifi_provenance_repository_max_storage_size;
- static const char *nifi_provenance_repository_directory_default;
- static const char *nifi_provenance_repository_enable;
- static const char *nifi_flowfile_repository_max_storage_time;
- static const char *nifi_dbcontent_repository_directory_default;
- static const char *nifi_flowfile_repository_max_storage_size;
- static const char *nifi_flowfile_repository_directory_default;
- static const char *nifi_flowfile_repository_enable;
- static const char *nifi_remote_input_secure;
- static const char *nifi_remote_input_http;
- static const char *nifi_security_need_ClientAuth;
+ static constexpr const char *nifi_default_directory = "nifi.default.directory";
+ static constexpr const char *nifi_flow_configuration_file = "nifi.flow.configuration.file";
+ static constexpr const char *nifi_flow_configuration_file_exit_failure = "nifi.flow.configuration.file.exit.onfailure";
+ static constexpr const char *nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
+ static constexpr const char *nifi_flow_engine_threads = "nifi.flow.engine.threads";
+ static constexpr const char *nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
+ static constexpr const char *nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
+ static constexpr const char *nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
+ static constexpr const char *nifi_bored_yield_duration = "nifi.bored.yield.duration";
+ static constexpr const char *nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
+ static constexpr const char *nifi_flowcontroller_drain_timeout = "nifi.flowcontroller.drain.timeout";
+ static constexpr const char *nifi_log_level = "nifi.log.level";
+ static constexpr const char *nifi_server_name = "nifi.server.name";
+ static constexpr const char *nifi_configuration_class_name = "nifi.flow.configuration.class.name";
+ static constexpr const char *nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name";
+ static constexpr const char *nifi_content_repository_class_name = "nifi.content.repository.class.name";
+ static constexpr const char *nifi_volatile_repository_options = "nifi.volatile.repository.options.";
+ static constexpr const char *nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
+ static constexpr const char *nifi_server_port = "nifi.server.port";
+ static constexpr const char *nifi_server_report_interval = "nifi.server.report.interval";
+ static constexpr const char *nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
+ static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
+ static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
+ static constexpr const char *nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
+ static constexpr const char *nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
+ static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+ static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
+ static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
+ static constexpr const char *nifi_remote_input_http = "nifi.remote.input.http.enabled";
+ static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
// site2site security config
- static const char *nifi_security_client_certificate;
- static const char *nifi_security_client_private_key;
- static const char *nifi_security_client_pass_phrase;
- static const char *nifi_security_client_ca_certificate;
+ static constexpr const char *nifi_security_client_certificate = "nifi.security.client.certificate";
+ static constexpr const char *nifi_security_client_private_key = "nifi.security.client.private.key";
+ static constexpr const char *nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
+ static constexpr const char *nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
// nifi rest api user name and password
- static const char *nifi_rest_api_user_name;
- static const char *nifi_rest_api_password;
+ static constexpr const char *nifi_rest_api_user_name = "nifi.rest.api.user.name";
+ static constexpr const char *nifi_rest_api_password = "nifi.rest.api.password";
// c2 options
- static const char *nifi_c2_enable;
- static const char *nifi_c2_file_watch;
- static const char *nifi_c2_flow_id;
- static const char *nifi_c2_flow_url;
- static const char *nifi_c2_flow_base_url;
- static const char *nifi_c2_full_heartbeat;
+ static constexpr const char *nifi_c2_enable = "nifi.c2.enable";
+ static constexpr const char *nifi_c2_file_watch = "nifi.c2.file.watch";
+ static constexpr const char *nifi_c2_flow_id = "nifi.c2.flow.id";
+ static constexpr const char *nifi_c2_flow_url = "nifi.c2.flow.url";
+ static constexpr const char *nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
+ static constexpr const char *nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
// state management options
- static const char *nifi_state_management_provider_local;
- static const char *nifi_state_management_provider_local_always_persist;
- static const char *nifi_state_management_provider_local_auto_persistence_interval;
+ static constexpr const char *nifi_state_management_provider_local = "nifi.state.management.provider.local";
+ static constexpr const char *nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
+ static constexpr const char *nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
+
+ // disk space watchdog options
+ static constexpr const char *minifi_disk_space_watchdog_enable = "minifi.disk.space.watchdog.enable";
+ static constexpr const char *minifi_disk_space_watchdog_interval = "minifi.disk.space.watchdog.interval";
+ static constexpr const char *minifi_disk_space_watchdog_stop_threshold = "minifi.disk.space.watchdog.stop.threshold";
+ static constexpr const char *minifi_disk_space_watchdog_restart_threshold = "minifi.disk.space.watchdog.restart.threshold";
private:
std::string agent_identifier_;
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index e814be8..94d2bca 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -24,7 +24,9 @@
#include <vector>
#include <string>
#include <map>
+
#include "core/logging/Logger.h"
+#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
@@ -82,6 +84,16 @@ class Properties {
*/
int getInt(const std::string &key, int default_value) const;
+ utils::optional<std::string> get(const std::string& key) const {
+ std::string result;
+ const bool found = get(key, result);
+ if (found) {
+ return result;
+ } else {
+ return utils::nullopt;
+ }
+ }
+
// Parse one line in configure file like key=value
bool parseConfigureFileLine(char *buf, std::string &prop_key, std::string &prop_value);
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/utils/CallBackTimer.h
index faf25b3..c5fe96b 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/utils/CallBackTimer.h
@@ -33,7 +33,6 @@ namespace utils {
class CallBackTimer {
public:
CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
~CallBackTimer();
void stop();
diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index ee91627..224f455 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -43,6 +43,7 @@ using std::make_unique;
template<typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
T intdiv_ceil(T numerator, T denominator) {
+ gsl_Expects(denominator != 0);
// note: division and remainder is 1 instruction on x86
return numerator / denominator + (numerator % denominator > 0);
}
@@ -92,6 +93,74 @@ struct EnableSharedFromThis : virtual internal::EnableSharedFromThisBase {
}
};
+// utilities to define single expression functions with proper noexcept and a decltype-ed return type (like decltype(auto) since C++14)
+#define MINIFICPP_UTIL_DEDUCED(...) noexcept(noexcept(__VA_ARGS__)) -> decltype(__VA_ARGS__) { return __VA_ARGS__; }
+#define MINIFICPP_UTIL_DEDUCED_CONDITIONAL(condition, ...) noexcept(noexcept(__VA_ARGS__)) -> typename std::enable_if<(condition), decltype(__VA_ARGS__)>::type { return __VA_ARGS__; }
+
+#if __cplusplus < 201703L
+namespace detail {
+template<typename>
+struct is_reference_wrapper : std::false_type {};
+
+template<typename T>
+struct is_reference_wrapper<std::reference_wrapper<T>> : std::true_type {};
+
+// invoke on pointer to member function
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ (std::forward<Obj>(obj).*f)(std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ (std::forward<Obj>(obj).get().*f)(std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && !is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ ((*std::forward<Obj>(obj)).*f)(std::forward<Args>(args)...))
+
+// invoke on pointer to data member
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ std::forward<Obj>(obj).*f)
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ std::forward<Obj>(obj).get().*f)
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && !is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+ /* expr: */ (*std::forward<Obj>(obj)).*f)
+
+// invoke_impl
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ std::is_member_function_pointer<decltype(f)>::value,
+ /* expr: */ invoke_member_function_impl(f, std::forward<Obj>(obj), std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ std::is_member_object_pointer<decltype(f)>::value,
+ /* expr: */ invoke_member_object_impl(f, std::forward<Obj>(obj)))
+
+template<typename F, typename... Args>
+auto invoke_impl(F&& f, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+ /* cond: */ !std::is_member_function_pointer<F>::value && !std::is_member_object_pointer<F>::value,
+ /* expr: */ std::forward<F>(f)(std::forward<Args>(args)...))
+
+} // namespace detail
+
+template<typename F, typename... Args>
+auto invoke(F&& f, Args&&... args) MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(std::forward<F>(f), std::forward<Args>(args)...))
+#else
+using std::invoke
+#endif /* < C++17 */
+
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/utils/IntervalSwitch.h
similarity index 50%
copy from libminifi/include/utils/CallBackTimer.h
copy to libminifi/include/utils/IntervalSwitch.h
index faf25b3..0052b4e 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/utils/IntervalSwitch.h
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +16,12 @@
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-#define LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
+#pragma once
-#include <mutex>
-#include <condition_variable>
-#include <thread>
-#include <chrono>
#include <functional>
+#include <utility>
+
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -30,27 +29,42 @@ namespace nifi {
namespace minifi {
namespace utils {
-class CallBackTimer {
- public:
- CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
- ~CallBackTimer();
+enum class IntervalSwitchState {
+ LOWER,
+ UPPER,
+};
- void stop();
+namespace detail {
+struct SwitchReturn {
+ IntervalSwitchState state;
+ bool switched;
+};
+} // namespace detail
- void start();
+template<typename T, typename Comp = std::less<T>>
+class IntervalSwitch {
+ public:
+ IntervalSwitch(T lower_threshold, T upper_threshold, const IntervalSwitchState initial_state = IntervalSwitchState::UPPER)
+ :lower_threshold_{std::move(lower_threshold)}, upper_threshold_{std::move(upper_threshold)}, state_{initial_state} {
+ gsl_Expects(!less_(upper_threshold_, lower_threshold_));
+ }
- bool is_running() const;
+ detail::SwitchReturn operator()(const T& value) {
+ const auto old_state = state_;
+ if (less_(value, lower_threshold_)) {
+ state_ = IntervalSwitchState::LOWER;
+ } else if (!less_(value, upper_threshold_)) {
+ state_ = IntervalSwitchState::UPPER;
+ }
+ return {state_, state_ != old_state};
+ }
private:
- bool execute_;
- std::function<void(void)> func_;
- std::thread thd_;
- mutable std::mutex mtx_;
- mutable std::mutex cv_mtx_;
- std::condition_variable cv_;
-
- const std::chrono::milliseconds interval_;
+ T lower_threshold_;
+ T upper_threshold_;
+ Comp less_;
+
+ IntervalSwitchState state_;
};
} // namespace utils
@@ -58,6 +72,3 @@ class CallBackTimer {
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-
diff --git a/libminifi/include/utils/OptionalUtils.h b/libminifi/include/utils/OptionalUtils.h
index 3810238..905128c 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -18,9 +18,11 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
#define LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
+#include <type_traits>
#include <utility>
#include <nonstd/optional.hpp>
+#include "utils/GeneralUtils.h"
#include "utils/gsl.h"
namespace org {
@@ -37,6 +39,54 @@ optional<typename gsl_lite::remove_cvref<T>::type> optional_from_ptr(T&& obj) {
return obj == nullptr ? nullopt : optional<typename gsl_lite::remove_cvref<T>::type>{ std::forward<T>(obj) };
}
+template<typename>
+struct is_optional : std::false_type {};
+
+template<typename T>
+struct is_optional<optional<T>> : std::true_type {};
+
+namespace detail {
+template<typename T>
+struct map_wrapper {
+ T function;
+};
+
+// map implementation
+template<typename SourceType, typename F>
+auto operator|(const optional<SourceType>& o, map_wrapper<F> f) noexcept(noexcept(utils::invoke(std::forward<F>(f.function), *o)))
+ -> optional<typename std::decay<decltype(utils::invoke(std::forward<F>(f.function), *o))>::type> {
+ if (o.has_value()) {
+ return make_optional(utils::invoke(std::forward<F>(f.function), *o));
+ } else {
+ return nullopt;
+ }
+}
+
+template<typename T>
+struct flat_map_wrapper {
+ T function;
+};
+
+// flatMap implementation
+template<typename SourceType, typename F>
+auto operator|(const optional<SourceType>& o, flat_map_wrapper<F> f) noexcept(noexcept(utils::invoke(std::forward<F>(f.function), *o)))
+ -> optional<typename std::decay<decltype(*utils::invoke(std::forward<F>(f.function), *o))>::type> {
+ static_assert(is_optional<decltype(utils::invoke(std::forward<F>(f.function), *o))>::value, "flatMap expects a function returning optional");
+ if (o.has_value()) {
+ return utils::invoke(std::forward<F>(f.function), *o);
+ } else {
+ return nullopt;
+ }
+}
+
+} // namespace detail
+
+template<typename T>
+detail::map_wrapper<T&&> map(T&& func) noexcept { return {std::forward<T>(func)}; }
+
+template<typename T>
+detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return {std::forward<T>(func)}; }
+
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 7c90d43..be022eb 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -93,478 +93,479 @@ namespace minifi {
namespace utils {
namespace file {
-/**
- * Simple implementation of some file system utilities.
- *
- */
-class FileUtils {
- private:
- static inline int platform_create_dir(const std::string &path) {
-#ifdef WIN32
- return _mkdir(path.c_str());
-#else
- return mkdir(path.c_str(), 0700);
-#endif
- }
+namespace FileUtils = ::org::apache::nifi::minifi::utils::file;
- public:
- FileUtils() = delete;
-
- /*
- * Get the platform-specific path separator.
- * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
- * @return the path separator character
- */
- static char get_separator(bool force_posix = false) {
+namespace detail {
+static inline int platform_create_dir(const std::string& path) {
#ifdef WIN32
- if (force_posix) {
- return '/';
- } else {
- return '\\';
- }
+ return _mkdir(path.c_str());
#else
- return '/';
+ return mkdir(path.c_str(), 0700);
#endif
- }
+}
+} // namespace detail
- static std::string create_temp_directory(char* format) {
+/*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
+ */
+inline char get_separator(bool force_posix = false) {
#ifdef WIN32
- const std::string tempDirectory = concat_path(get_temp_directory(),
- minifi::utils::IdGenerator::getIdGenerator()->generate().to_string());
- create_dir(tempDirectory);
- return tempDirectory;
-#else
- if (mkdtemp(format) == nullptr) { return ""; }
- return format;
-#endif
+ if (!force_posix) {
+ return '\\';
}
-
- static std::string get_temp_directory() {
+#endif
+ return '/';
+}
+
+inline std::string normalize_path_separators(std::string path, bool force_posix = false) {
+ const auto normalize_separators = [force_posix](const char c) {
+ if (c == '\\' || c == '/') { return get_separator(force_posix); }
+ return c;
+ };
+ std::transform(std::begin(path), std::end(path), std::begin(path), normalize_separators);
+ return path;
+}
+
+inline std::string get_temp_directory() {
#ifdef WIN32
- char tempBuffer[MAX_PATH];
- const auto ret = GetTempPath(MAX_PATH, tempBuffer);
- if (ret > MAX_PATH || ret == 0)
- throw std::runtime_error("Couldn't locate temporary directory path");
- return tempBuffer;
+ char tempBuffer[MAX_PATH];
+ const auto ret = GetTempPath(MAX_PATH, tempBuffer);
+ if (ret > MAX_PATH || ret == 0)
+ throw std::runtime_error("Couldn't locate temporary directory path");
+ return tempBuffer;
#else
- return "/tmp";
+ return "/tmp";
#endif
- }
+}
- static int64_t delete_dir(const std::string &path, bool delete_files_recursively = true) {
+inline int64_t delete_dir(const std::string &path, bool delete_files_recursively = true) {
#ifdef BOOST_VERSION
- try {
- if (boost::filesystem::exists(path)) {
- if (delete_files_recursively) {
- boost::filesystem::remove_all(path);
- } else {
- boost::filesystem::remove(path);
- }
+ try {
+ if (boost::filesystem::exists(path)) {
+ if (delete_files_recursively) {
+ boost::filesystem::remove_all(path);
+ } else {
+ boost::filesystem::remove(path);
}
- } catch(boost::filesystem::filesystem_error const & e) {
- return -1;
- // display error message
}
- return 0;
+ } catch(boost::filesystem::filesystem_error const & e) {
+ return -1;
+ // display error message
+ }
+ return 0;
#elif defined(WIN32)
- WIN32_FIND_DATA FindFileData;
- HANDLE hFind;
- DWORD Attributes;
- std::string str;
-
-
- std::stringstream pathstr;
- pathstr << path << "\\*";
- str = pathstr.str();
- // List files
- hFind = FindFirstFile(str.c_str(), &FindFileData);
- if (hFind != INVALID_HANDLE_VALUE) {
- do {
- if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
- std::stringstream strs;
- strs << path << "\\" << FindFileData.cFileName;
- str = strs.str();
-
- Attributes = GetFileAttributes(str.c_str());
- if (Attributes & FILE_ATTRIBUTE_DIRECTORY) {
- // is directory
- delete_dir(str, delete_files_recursively);
- } else {
- remove(str.c_str());
- // not directory
- }
+ WIN32_FIND_DATA FindFileData;
+ HANDLE hFind;
+ DWORD Attributes;
+ std::string str;
+
+
+ std::stringstream pathstr;
+ pathstr << path << "\\*";
+ str = pathstr.str();
+ // List files
+ hFind = FindFirstFile(str.c_str(), &FindFileData);
+ if (hFind != INVALID_HANDLE_VALUE) {
+ do {
+ if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
+ std::stringstream strs;
+ strs << path << "\\" << FindFileData.cFileName;
+ str = strs.str();
+
+ Attributes = GetFileAttributes(str.c_str());
+ if (Attributes & FILE_ATTRIBUTE_DIRECTORY) {
+ // is directory
+ delete_dir(str, delete_files_recursively);
+ } else {
+ remove(str.c_str());
+ // not directory
}
- }while (FindNextFile(hFind, &FindFileData));
- FindClose(hFind);
+ }
+ }while (FindNextFile(hFind, &FindFileData));
+ FindClose(hFind);
- RemoveDirectory(path.c_str());
- }
- return 0;
+ RemoveDirectory(path.c_str());
+ }
+ return 0;
#else
- DIR *current_directory = opendir(path.c_str());
- int r = -1;
- if (current_directory) {
- struct dirent *p;
- r = 0;
- while (!r && (p = readdir(current_directory))) {
- int r2 = -1;
- std::stringstream newpath;
- if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) {
- continue;
- }
- struct stat statbuf;
+ DIR *current_directory = opendir(path.c_str());
+ int r = -1;
+ if (current_directory) {
+ struct dirent *p;
+ r = 0;
+ while (!r && (p = readdir(current_directory))) {
+ int r2 = -1;
+ std::stringstream newpath;
+ if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) {
+ continue;
+ }
+ struct stat statbuf;
- newpath << path << "/" << p->d_name;
+ newpath << path << "/" << p->d_name;
- if (!stat(newpath.str().c_str(), &statbuf)) {
- if (S_ISDIR(statbuf.st_mode)) {
- if (delete_files_recursively) {
- r2 = delete_dir(newpath.str(), delete_files_recursively);
- }
- } else {
- r2 = unlink(newpath.str().c_str());
+ if (!stat(newpath.str().c_str(), &statbuf)) {
+ if (S_ISDIR(statbuf.st_mode)) {
+ if (delete_files_recursively) {
+ r2 = delete_dir(newpath.str(), delete_files_recursively);
}
+ } else {
+ r2 = unlink(newpath.str().c_str());
}
- r = r2;
}
- closedir(current_directory);
+ r = r2;
}
+ closedir(current_directory);
+ }
- if (!r) {
- return rmdir(path.c_str());
- }
+ if (!r) {
+ return rmdir(path.c_str());
+ }
- return 0;
+ return 0;
#endif
- }
+}
- static uint64_t last_write_time(const std::string &path) {
+inline uint64_t last_write_time(const std::string &path) {
#ifdef BOOST_VERSION
- return boost::filesystem::last_write_time(movedFile.str());
+ return boost::filesystem::last_write_time(movedFile.str());
#else
#ifdef WIN32
- struct _stat result;
- if (_stat(path.c_str(), &result) == 0) {
- return result.st_mtime;
- }
+ struct _stat result;
+ if (_stat(path.c_str(), &result) == 0) {
+ return result.st_mtime;
+ }
#else
- struct stat result;
- if (stat(path.c_str(), &result) == 0) {
- return result.st_mtime;
- }
+ struct stat result;
+ if (stat(path.c_str(), &result) == 0) {
+ return result.st_mtime;
+ }
#endif
#endif
- return 0;
- }
+ return 0;
+}
- static std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> last_write_time_point(const std::string &path) {
- return std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>{std::chrono::seconds{last_write_time(path)}};
- }
+inline std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> last_write_time_point(const std::string &path) {
+ return std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>{std::chrono::seconds{last_write_time(path)}};
+}
- static uint64_t file_size(const std::string &path) {
+inline uint64_t file_size(const std::string &path) {
#ifdef WIN32
- struct _stat result;
- if (_stat(path.c_str(), &result) == 0) {
- return result.st_size;
- }
+ struct _stat result;
+ if (_stat(path.c_str(), &result) == 0) {
+ return result.st_size;
+ }
#else
- struct stat result;
- if (stat(path.c_str(), &result) == 0) {
- return result.st_size;
- }
-#endif
- return 0;
+ struct stat result;
+ if (stat(path.c_str(), &result) == 0) {
+ return result.st_size;
}
+#endif
+ return 0;
+}
- static bool set_last_write_time(const std::string &path, uint64_t write_time) {
+inline bool set_last_write_time(const std::string &path, uint64_t write_time) {
#ifdef WIN32
- struct __utimbuf64 utim;
- utim.actime = write_time;
- utim.modtime = write_time;
- return _utime64(path.c_str(), &utim) == 0U;
+ struct __utimbuf64 utim;
+ utim.actime = write_time;
+ utim.modtime = write_time;
+ return _utime64(path.c_str(), &utim) == 0U;
#else
- struct utimbuf utim;
- utim.actime = write_time;
- utim.modtime = write_time;
- return utime(path.c_str(), &utim) == 0U;
+ struct utimbuf utim;
+ utim.actime = write_time;
+ utim.modtime = write_time;
+ return utime(path.c_str(), &utim) == 0U;
#endif
- }
+}
#ifndef WIN32
- static bool get_permissions(const std::string &path, uint32_t &permissions) {
- struct stat result;
- if (stat(path.c_str(), &result) == 0) {
- permissions = result.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
- return true;
- }
- return false;
+inline bool get_permissions(const std::string &path, uint32_t &permissions) {
+ struct stat result;
+ if (stat(path.c_str(), &result) == 0) {
+ permissions = result.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
+ return true;
}
+ return false;
+}
#endif
#ifndef WIN32
- static bool get_uid_gid(const std::string &path, uint64_t &uid, uint64_t &gid) {
- struct stat result;
- if (stat(path.c_str(), &result) == 0) {
- uid = result.st_uid;
- gid = result.st_gid;
- return true;
- }
- return false;
+inline bool get_uid_gid(const std::string &path, uint64_t &uid, uint64_t &gid) {
+ struct stat result;
+ if (stat(path.c_str(), &result) == 0) {
+ uid = result.st_uid;
+ gid = result.st_gid;
+ return true;
}
+ return false;
+}
#endif
- static int is_directory(const char * path) {
- struct stat dir_stat;
- if (stat(path, &dir_stat) < 0) {
- return 0;
- }
- return S_ISDIR(dir_stat.st_mode);
- }
+inline int is_directory(const char * path) {
+ struct stat dir_stat;
+ if (stat(path, &dir_stat) < 0) {
+ return 0;
+ }
+ return S_ISDIR(dir_stat.st_mode);
+}
- static int create_dir(const std::string& path, bool recursive = true) {
+inline int create_dir(const std::string& path, bool recursive = true) {
#ifdef BOOST_VERSION
- boost::filesystem::path dir(path);
- if (boost::filesystem::create_directory(dir)) {
- return 0;
- } else {
- return -1;
- }
+ boost::filesystem::path dir(path);
+ if (boost::filesystem::create_directory(dir)) {
+ return 0;
+ } else {
+ return -1;
+ }
#else
- if (!recursive) {
- if (platform_create_dir(path) != 0 && errno != EEXIST) {
- return -1;
- }
- return 0;
- }
- if (platform_create_dir(path) == 0) {
- return 0;
- }
+ if (!recursive) {
+ if (detail::platform_create_dir(path) != 0 && errno != EEXIST) {
+ return -1;
+ }
+ return 0;
+ }
+ if (detail::platform_create_dir(path) == 0) {
+ return 0;
+ }
- switch (errno) {
- case ENOENT: {
- size_t found = path.find_last_of(get_separator());
+ switch (errno) {
+ case ENOENT: {
+ size_t found = path.find_last_of(get_separator());
- if (found == std::string::npos) {
- return -1;
- }
+ if (found == std::string::npos) {
+ return -1;
+ }
- const std::string dir = path.substr(0, found);
- int res = create_dir(dir);
- if (res < 0) {
- return -1;
- }
- return platform_create_dir(path);
- }
- case EEXIST: {
- if (is_directory(path.c_str())) {
- return 0;
- }
- return -1;
- }
- default:
- return -1;
- }
- return -1;
-#endif
+ const std::string dir = path.substr(0, found);
+ int res = create_dir(dir);
+ if (res < 0) {
+ return -1;
+ }
+ return detail::platform_create_dir(path);
}
-
- static int copy_file(const std::string &path_from, const std::string dest_path) {
- std::ifstream src(path_from, std::ios::binary);
- if (!src.is_open())
+ case EEXIST: {
+ if (is_directory(path.c_str())) {
+ return 0;
+ }
return -1;
- std::ofstream dest(dest_path, std::ios::binary);
- dest << src.rdbuf();
- return 0;
}
+ default:
+ return -1;
+ }
+ return -1;
+#endif
+}
- static void addFilesMatchingExtension(const std::shared_ptr<logging::Logger> &logger, const std::string &originalPath, const std::string &extension, std::vector<std::string> &accruedFiles) {
+inline int copy_file(const std::string &path_from, const std::string dest_path) {
+ std::ifstream src(path_from, std::ios::binary);
+ if (!src.is_open())
+ return -1;
+ std::ofstream dest(dest_path, std::ios::binary);
+ dest << src.rdbuf();
+ return 0;
+}
+
+inline void addFilesMatchingExtension(const std::shared_ptr<logging::Logger> &logger, const std::string &originalPath, const std::string &extension, std::vector<std::string> &accruedFiles) {
#ifndef WIN32
- struct stat s;
- if (stat(originalPath.c_str(), &s) == 0) {
- if (s.st_mode & S_IFDIR) {
- DIR *d;
- d = opendir(originalPath.c_str());
- if (!d) {
+ struct stat s;
+ if (stat(originalPath.c_str(), &s) == 0) {
+ if (s.st_mode & S_IFDIR) {
+ DIR *d;
+ d = opendir(originalPath.c_str());
+ if (!d) {
+ return;
+ }
+ // only perform a listing while we are not empty
+ logger->log_debug("Performing file listing on %s", originalPath);
+
+ struct dirent *entry;
+ entry = readdir(d);
+ while (entry != nullptr) {
+ std::string d_name = entry->d_name;
+ std::string path = originalPath + "/" + d_name;
+ struct stat statbuf { };
+ if (stat(path.c_str(), &statbuf) != 0) {
+ logger->log_warn("Failed to stat %s", path);
return;
}
- // only perform a listing while we are not empty
- logger->log_debug("Performing file listing on %s", originalPath);
-
- struct dirent *entry;
- entry = readdir(d);
- while (entry != nullptr) {
- std::string d_name = entry->d_name;
- std::string path = originalPath + "/" + d_name;
- struct stat statbuf { };
- if (stat(path.c_str(), &statbuf) != 0) {
- logger->log_warn("Failed to stat %s", path);
- return;
+ if (S_ISDIR(statbuf.st_mode)) {
+ // if this is a directory
+ if (d_name != ".." && d_name != ".") {
+ addFilesMatchingExtension(logger, path, extension, accruedFiles);
}
- if (S_ISDIR(statbuf.st_mode)) {
- // if this is a directory
- if (d_name != ".." && d_name != ".") {
- addFilesMatchingExtension(logger, path, extension, accruedFiles);
- }
- } else {
- if (utils::StringUtils::endsWith(path, extension)) {
- logger->log_info("Adding %s to paths", path);
- accruedFiles.push_back(path);
- }
+ } else {
+ if (utils::StringUtils::endsWith(path, extension)) {
+ logger->log_info("Adding %s to paths", path);
+ accruedFiles.push_back(path);
}
- entry = readdir(d);
- }
- closedir(d);
- } else if (s.st_mode & S_IFREG) {
- if (utils::StringUtils::endsWith(originalPath, extension)) {
- logger->log_info("Adding %s to paths", originalPath);
- accruedFiles.push_back(originalPath);
}
- } else {
- logger->log_error("Could not stat", originalPath);
+ entry = readdir(d);
+ }
+ closedir(d);
+ } else if (s.st_mode & S_IFREG) {
+ if (utils::StringUtils::endsWith(originalPath, extension)) {
+ logger->log_info("Adding %s to paths", originalPath);
+ accruedFiles.push_back(originalPath);
}
-
} else {
- logger->log_error("Could not access %s", originalPath);
+ logger->log_error("Could not stat", originalPath);
}
+
+ } else {
+ logger->log_error("Could not access %s", originalPath);
+ }
#else
- HANDLE hFind;
- WIN32_FIND_DATA FindFileData;
+ HANDLE hFind;
+ WIN32_FIND_DATA FindFileData;
- std::string pathToSearch = originalPath + "\\*" + extension;
- if ((hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
- do {
- struct _stat statbuf {};
+ std::string pathToSearch = originalPath + "\\*" + extension;
+ if ((hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
+ do {
+ struct _stat statbuf {};
- std::string path = originalPath + "\\" + FindFileData.cFileName;
- logger->log_info("Adding %s to paths", path);
- if (_stat(path.c_str(), &statbuf) != 0) {
- logger->log_warn("Failed to stat %s", path);
- break;
- }
- logger->log_info("Adding %s to paths", path);
- if (S_ISDIR(statbuf.st_mode)) {
- addFilesMatchingExtension(logger, path, extension, accruedFiles);
- } else {
- if (utils::StringUtils::endsWith(path, extension)) {
- logger->log_info("Adding %s to paths", path);
- accruedFiles.push_back(path);
- }
+ std::string path = originalPath + "\\" + FindFileData.cFileName;
+ logger->log_info("Adding %s to paths", path);
+ if (_stat(path.c_str(), &statbuf) != 0) {
+ logger->log_warn("Failed to stat %s", path);
+ break;
+ }
+ logger->log_info("Adding %s to paths", path);
+ if (S_ISDIR(statbuf.st_mode)) {
+ addFilesMatchingExtension(logger, path, extension, accruedFiles);
+ } else {
+ if (utils::StringUtils::endsWith(path, extension)) {
+ logger->log_info("Adding %s to paths", path);
+ accruedFiles.push_back(path);
}
- }while (FindNextFileA(hFind, &FindFileData));
- FindClose(hFind);
- }
+ }
+ }while (FindNextFileA(hFind, &FindFileData));
+ FindClose(hFind);
+ }
#endif
+}
+
+/*
+ * Provides a platform-independent function to list a directory
+ * Callback is called for every file found: first argument is the path of the directory, second is the filename
+ * Return value of the callback is used to continue (true) or stop (false) listing
+ */
+inline void list_dir(const std::string& dir, std::function<bool(const std::string&, const std::string&)> callback,
+ const std::shared_ptr<logging::Logger> &logger, bool recursive = true) {
+ logger->log_debug("Performing file listing against %s", dir);
+#ifndef WIN32
+ DIR *d = opendir(dir.c_str());
+ if (!d) {
+ logger->log_warn("Failed to open directory: %s", dir.c_str());
+ return;
}
- static std::vector<std::pair<std::string, std::string>> list_dir_all(const std::string& dir, const std::shared_ptr<logging::Logger> &logger,
- bool recursive = true) {
- std::vector<std::pair<std::string, std::string>> fileList;
- auto lambda = [&fileList] (const std::string &path, const std::string &filename) {
- fileList.push_back(make_pair(path, filename));
- return true;
- };
+ struct dirent *entry;
+ while ((entry = readdir(d)) != NULL) {
+ std::string d_name = entry->d_name;
+ std::string path = dir + get_separator() + d_name;
- list_dir(dir, lambda, logger, recursive);
+ struct stat statbuf;
+ if (stat(path.c_str(), &statbuf) != 0) {
+ logger->log_warn("Failed to stat %s", path);
+ continue;
+ }
- return fileList;
+ if (S_ISDIR(statbuf.st_mode)) {
+ // if this is a directory
+ if (recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
+ list_dir(path, callback, logger, recursive);
+ }
+ } else {
+ if (!callback(dir, d_name)) {
+ break;
+ }
+ }
}
+ closedir(d);
+#else
+ HANDLE hFind;
+ WIN32_FIND_DATA FindFileData;
- /*
- * Provides a platform-independent function to list a directory
- * Callback is called for every file found: first argument is the path of the directory, second is the filename
- * Return value of the callback is used to continue (true) or stop (false) listing
- */
- static void list_dir(const std::string& dir, std::function<bool(const std::string&, const std::string&)> callback,
- const std::shared_ptr<logging::Logger> &logger, bool recursive = true) {
- logger->log_debug("Performing file listing against %s", dir);
-#ifndef WIN32
- DIR *d = opendir(dir.c_str());
- if (!d) {
- logger->log_warn("Failed to open directory: %s", dir.c_str());
- return;
- }
+ std::string pathToSearch = dir + "\\*.*";
+ hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData);
- struct dirent *entry;
- while ((entry = readdir(d)) != NULL) {
- std::string d_name = entry->d_name;
- std::string path = dir + get_separator() + d_name;
+ if (hFind == INVALID_HANDLE_VALUE) {
+ logger->log_warn("Failed to open directory: %s", dir.c_str());
+ return;
+ }
- struct stat statbuf;
- if (stat(path.c_str(), &statbuf) != 0) {
+ do {
+ struct _stat statbuf {};
+ if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
+ std::string path = dir + get_separator() + FindFileData.cFileName;
+ if (_stat(path.c_str(), &statbuf) != 0) {
logger->log_warn("Failed to stat %s", path);
continue;
}
-
if (S_ISDIR(statbuf.st_mode)) {
- // if this is a directory
- if (recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
+ if (recursive) {
list_dir(path, callback, logger, recursive);
}
} else {
- if (!callback(dir, d_name)) {
+ if (!callback(dir, FindFileData.cFileName)) {
break;
}
}
}
- closedir(d);
-#else
- HANDLE hFind;
- WIN32_FIND_DATA FindFileData;
+ } while (FindNextFileA(hFind, &FindFileData));
+ FindClose(hFind);
+#endif
+}
- std::string pathToSearch = dir + "\\*.*";
- hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData);
+inline std::vector<std::pair<std::string, std::string>> list_dir_all(const std::string& dir, const std::shared_ptr<logging::Logger> &logger,
+ bool recursive = true) {
+ std::vector<std::pair<std::string, std::string>> fileList;
+ auto lambda = [&fileList] (const std::string &path, const std::string &filename) {
+ fileList.push_back(make_pair(path, filename));
+ return true;
+ };
- if (hFind == INVALID_HANDLE_VALUE) {
- logger->log_warn("Failed to open directory: %s", dir.c_str());
- return;
- }
+ list_dir(dir, lambda, logger, recursive);
- do {
- struct _stat statbuf {};
- if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
- std::string path = dir + get_separator() + FindFileData.cFileName;
- if (_stat(path.c_str(), &statbuf) != 0) {
- logger->log_warn("Failed to stat %s", path);
- continue;
- }
- if (S_ISDIR(statbuf.st_mode)) {
- if (recursive) {
- list_dir(path, callback, logger, recursive);
- }
- } else {
- if (!callback(dir, FindFileData.cFileName)) {
- break;
- }
- }
- }
- } while (FindNextFileA(hFind, &FindFileData));
- FindClose(hFind);
-#endif
- }
+ return fileList;
+}
- static std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
- if (root.empty()) {
- return child;
- }
- std::stringstream new_path;
- if (root.back() == get_separator(force_posix)) {
- new_path << root << child;
- } else {
- new_path << root << get_separator(force_posix) << child;
- }
- return new_path.str();
+inline std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
+ if (root.empty()) {
+ return child;
+ }
+ std::stringstream new_path;
+ if (root.back() == get_separator(force_posix)) {
+ new_path << root << child;
+ } else {
+ new_path << root << get_separator(force_posix) << child;
}
+ return new_path.str();
+}
- static std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
- if (path.empty()) {
- /* Empty path has no parent and no child*/
- return std::make_tuple("", "");
- }
- bool absolute = false;
- size_t root_pos = 0U;
+inline std::string create_temp_directory(char* format) {
#ifdef WIN32
- if (!force_posix) {
+ const std::string tempDirectory = concat_path(get_temp_directory(),
+ minifi::utils::IdGenerator::getIdGenerator()->generate().to_string());
+ create_dir(tempDirectory);
+ return tempDirectory;
+#else
+ if (mkdtemp(format) == nullptr) { return ""; }
+ return format;
+#endif
+}
+
+inline std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
+ if (path.empty()) {
+ /* Empty path has no parent and no child*/
+ return std::make_tuple("", "");
+ }
+ bool absolute = false;
+ size_t root_pos = 0U;
+#ifdef WIN32
+ if (!force_posix) {
if (path[0] == '\\') {
absolute = true;
if (path.size() < 2U) {
@@ -595,78 +596,78 @@ class FileUtils {
}
} else {
#else
- if (true) {
+ if (true) {
#endif
- if (path[0] == '/') {
- absolute = true;
- root_pos = 0U;
- }
- }
- /* Maybe we are just a single relative child */
- if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
- return std::make_tuple("", path);
- }
- /* Ignore trailing separators */
- size_t last_pos = path.size() - 1;
- while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
- last_pos--;
+ if (path[0] == '/') {
+ absolute = true;
+ root_pos = 0U;
}
- if (absolute && last_pos == root_pos) {
- /* This means we are only a root */
- return std::make_tuple("", "");
- }
- /* Find parent-child separator */
- size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
- if (last_separator == std::string::npos || last_separator < root_pos) {
- return std::make_tuple("", "");
- }
- std::string parent = path.substr(0, last_separator + 1);
- std::string child = path.substr(last_separator + 1);
-
- return std::make_tuple(std::move(parent), std::move(child));
}
-
- static std::string get_parent_path(const std::string& path, bool force_posix = false) {
- std::string parent_path;
- std::tie(parent_path, std::ignore) = split_path(path, force_posix);
- return parent_path;
+ /* Maybe we are just a single relative child */
+ if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
+ return std::make_tuple("", path);
}
-
- static std::string get_child_path(const std::string& path, bool force_posix = false) {
- std::string child_path;
- std::tie(std::ignore, child_path) = split_path(path, force_posix);
- return child_path;
+ /* Ignore trailing separators */
+ size_t last_pos = path.size() - 1;
+ while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
+ last_pos--;
+ }
+ if (absolute && last_pos == root_pos) {
+ /* This means we are only a root */
+ return std::make_tuple("", "");
+ }
+ /* Find parent-child separator */
+ size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
+ if (last_separator == std::string::npos || last_separator < root_pos) {
+ return std::make_tuple("", "");
}
+ std::string parent = path.substr(0, last_separator + 1);
+ std::string child = path.substr(last_separator + 1);
+
+ return std::make_tuple(std::move(parent), std::move(child));
+}
+
+inline std::string get_parent_path(const std::string& path, bool force_posix = false) {
+ std::string parent_path;
+ std::tie(parent_path, std::ignore) = split_path(path, force_posix);
+ return parent_path;
+}
+
+inline std::string get_child_path(const std::string& path, bool force_posix = false) {
+ std::string child_path;
+ std::tie(std::ignore, child_path) = split_path(path, force_posix);
+ return child_path;
+}
- static bool is_hidden(const std::string& path) {
+inline bool is_hidden(const std::string& path) {
#ifdef WIN32
- DWORD attributes = GetFileAttributesA(path.c_str());
+ DWORD attributes = GetFileAttributesA(path.c_str());
return ((attributes != INVALID_FILE_ATTRIBUTES) && ((attributes & FILE_ATTRIBUTE_HIDDEN) != 0));
#else
- return std::get<1>(split_path(path)).rfind(".", 0) == 0;
+ return std::get<1>(split_path(path)).rfind(".", 0) == 0;
#endif
- }
+}
- /*
- * Returns the absolute path of the current executable
- */
- static std::string get_executable_path() {
+/*
+ * Returns the absolute path of the current executable
+ */
+inline std::string get_executable_path() {
#if defined(__linux__)
- std::vector<char> buf(1024U);
- while (true) {
- ssize_t ret = readlink("/proc/self/exe", buf.data(), buf.size());
- if (ret < 0) {
- return "";
- }
- if (static_cast<size_t>(ret) == buf.size()) {
- /* It may have been truncated */
- buf.resize(buf.size() * 2);
- continue;
- }
- return std::string(buf.data(), ret);
+ std::vector<char> buf(1024U);
+ while (true) {
+ ssize_t ret = readlink("/proc/self/exe", buf.data(), buf.size());
+ if (ret < 0) {
+ return "";
}
+ if (static_cast<size_t>(ret) == buf.size()) {
+ /* It may have been truncated */
+ buf.resize(buf.size() * 2);
+ continue;
+ }
+ return std::string(buf.data(), ret);
+ }
#elif defined(__APPLE__)
- std::vector<char> buf(PATH_MAX);
+ std::vector<char> buf(PATH_MAX);
uint32_t buf_size = buf.size();
while (_NSGetExecutablePath(buf.data(), &buf_size) != 0) {
buf.resize(buf_size);
@@ -697,37 +698,37 @@ class FileUtils {
#else
return "";
#endif
- }
+}
- /*
- * Returns the absolute path to the directory containing the current executable
- */
- static std::string get_executable_dir() {
- auto executable_path = get_executable_path();
- if (executable_path.empty()) {
- return "";
- }
- return get_parent_path(executable_path);
+/*
+ * Returns the absolute path to the directory containing the current executable
+ */
+inline std::string get_executable_dir() {
+ auto executable_path = get_executable_path();
+ if (executable_path.empty()) {
+ return "";
}
+ return get_parent_path(executable_path);
+}
- static int close(int file_descriptor) {
+inline int close(int file_descriptor) {
#ifdef WIN32
- return _close(file_descriptor);
+ return _close(file_descriptor);
#else
- return ::close(file_descriptor);
+ return ::close(file_descriptor);
#endif
- }
+}
- static int access(const char *path_name, int mode) {
+inline int access(const char *path_name, int mode) {
#ifdef WIN32
- return _access(path_name, mode);
+ return _access(path_name, mode);
#else
- return ::access(path_name, mode);
+ return ::access(path_name, mode);
#endif
- }
+}
#ifdef WIN32
- static std::error_code hide_file(const char* const file_name) {
+inline std::error_code hide_file(const char* const file_name) {
const bool success = SetFileAttributesA(file_name, FILE_ATTRIBUTE_HIDDEN);
if (!success) {
// note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
@@ -738,8 +739,7 @@ class FileUtils {
}
#endif /* WIN32 */
- static uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
-}; // NOLINT
+uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
} // namespace file
} // namespace utils
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h
index 22fafd6..00db039 100644
--- a/libminifi/include/utils/file/PathUtils.h
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -17,7 +17,12 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
#define LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
+#include <cctype>
+#include <cinttypes>
+#include <memory>
#include <string>
+#include <system_error>
+#include <utility>
namespace org {
namespace apache {
@@ -25,7 +30,9 @@ namespace nifi {
namespace minifi {
namespace utils {
namespace file {
-namespace PathUtils {
+
+namespace PathUtils = ::org::apache::nifi::minifi::utils::file;
+using path = const char*;
/**
* Extracts the filename and path performing some validation of the path and output to ensure
@@ -47,7 +54,54 @@ std::string getFullPath(const std::string& path);
std::string globToRegex(std::string glob);
-} // namespace PathUtils
+inline bool isAbsolutePath(const char* const path) noexcept {
+#ifdef _WIN32
+ return path && std::isalpha(path[0]) && path[1] == ':' && (path[2] == '\\' || path[2] == '/');
+#else
+ return path && path[0] == '/';
+#endif
+}
+
+
+/**
+ * Represents filesystem space information in bytes
+ */
+struct space_info {
+ std::uintmax_t capacity;
+ std::uintmax_t free;
+ std::uintmax_t available;
+
+ friend bool operator==(const space_info& a, const space_info& b) noexcept {
+ return a.capacity == b.capacity && a.free == b.free && a.available == b.available;
+ }
+};
+
+class filesystem_error : public std::system_error {
+ public:
+ filesystem_error(const std::string& what_arg, const std::error_code ec)
+ :std::system_error{ec, what_arg}
+ {}
+ filesystem_error(const std::string& what_arg, const path path1, const path path2, const std::error_code ec)
+ :std::system_error{ec, what_arg}, paths_involved_{std::make_shared<const std::pair<std::string, std::string>>(path1, path2)}
+ {}
+
+ // copy should be noexcept as soon as libstdc++ fixes std::system_error copy
+ filesystem_error(const filesystem_error& o) = default;
+ filesystem_error& operator=(const filesystem_error&) = default;
+
+ path path1() const noexcept { return paths_involved_->first.c_str(); }
+ path path2() const noexcept { return paths_involved_->second.c_str(); }
+
+ private:
+ std::shared_ptr<const std::pair<std::string, std::string>> paths_involved_;
+};
+
+/**
+ * Provides filesystem space information for the specified directory
+ */
+space_info space(path);
+space_info space(path, std::error_code&) noexcept;
+
} // namespace file
} // namespace utils
} // namespace minifi
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 467524b..956ca4a 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -22,51 +22,55 @@ namespace apache {
namespace nifi {
namespace minifi {
-const char *Configure::nifi_default_directory = "nifi.default.directory";
-const char *Configure::nifi_c2_enable = "nifi.c2.enable";
-const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file";
-const char *Configure::nifi_flow_configuration_file_exit_failure = "nifi.flow.configuration.file.exit.onfailure";
-const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
-const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
-const char *Configure::nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
-const char *Configure::nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
-const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
-const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
-const char *Configure::nifi_flowcontroller_drain_timeout = "nifi.flowcontroller.drain.timeout";
-const char *Configure::nifi_log_level = "nifi.log.level";
-const char *Configure::nifi_server_name = "nifi.server.name";
-const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name";
-const char *Configure::nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name";
-const char *Configure::nifi_content_repository_class_name = "nifi.content.repository.class.name";
-const char *Configure::nifi_volatile_repository_options = "nifi.volatile.repository.options.";
-const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
-const char *Configure::nifi_server_port = "nifi.server.port";
-const char *Configure::nifi_server_report_interval = "nifi.server.report.interval";
-const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
-const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
-const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
-const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
-const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
-const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
-const char *Configure::nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
-const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_remote_input_http = "nifi.remote.input.http.enabled";
-const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
-const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
-const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
-const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";
-const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
-const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
-const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
-const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
-const char *Configure::nifi_state_management_provider_local = "nifi.state.management.provider.local";
-const char *Configure::nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
-const char *Configure::nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
+constexpr const char *Configure::nifi_default_directory;
+constexpr const char *Configure::nifi_c2_enable;
+constexpr const char *Configure::nifi_flow_configuration_file;
+constexpr const char *Configure::nifi_flow_configuration_file_exit_failure;
+constexpr const char *Configure::nifi_flow_configuration_file_backup_update;
+constexpr const char *Configure::nifi_flow_engine_threads;
+constexpr const char *Configure::nifi_flow_engine_alert_period;
+constexpr const char *Configure::nifi_flow_engine_event_driven_time_slice;
+constexpr const char *Configure::nifi_administrative_yield_duration;
+constexpr const char *Configure::nifi_bored_yield_duration;
+constexpr const char *Configure::nifi_graceful_shutdown_seconds;
+constexpr const char *Configure::nifi_flowcontroller_drain_timeout;
+constexpr const char *Configure::nifi_log_level;
+constexpr const char *Configure::nifi_server_name;
+constexpr const char *Configure::nifi_configuration_class_name;
+constexpr const char *Configure::nifi_flow_repository_class_name;
+constexpr const char *Configure::nifi_content_repository_class_name;
+constexpr const char *Configure::nifi_volatile_repository_options;
+constexpr const char *Configure::nifi_provenance_repository_class_name;
+constexpr const char *Configure::nifi_server_port;
+constexpr const char *Configure::nifi_server_report_interval;
+constexpr const char *Configure::nifi_provenance_repository_max_storage_size;
+constexpr const char *Configure::nifi_provenance_repository_max_storage_time;
+constexpr const char *Configure::nifi_provenance_repository_directory_default;
+constexpr const char *Configure::nifi_flowfile_repository_max_storage_size;
+constexpr const char *Configure::nifi_flowfile_repository_max_storage_time;
+constexpr const char *Configure::nifi_flowfile_repository_directory_default;
+constexpr const char *Configure::nifi_dbcontent_repository_directory_default;
+constexpr const char *Configure::nifi_remote_input_secure;
+constexpr const char *Configure::nifi_remote_input_http;
+constexpr const char *Configure::nifi_security_need_ClientAuth;
+constexpr const char *Configure::nifi_security_client_certificate;
+constexpr const char *Configure::nifi_security_client_private_key;
+constexpr const char *Configure::nifi_security_client_pass_phrase;
+constexpr const char *Configure::nifi_security_client_ca_certificate;
+constexpr const char *Configure::nifi_rest_api_user_name;
+constexpr const char *Configure::nifi_rest_api_password;
+constexpr const char *Configure::nifi_c2_file_watch;
+constexpr const char *Configure::nifi_c2_flow_id;
+constexpr const char *Configure::nifi_c2_flow_url;
+constexpr const char *Configure::nifi_c2_flow_base_url;
+constexpr const char *Configure::nifi_c2_full_heartbeat;
+constexpr const char *Configure::nifi_state_management_provider_local;
+constexpr const char *Configure::nifi_state_management_provider_local_always_persist;
+constexpr const char *Configure::nifi_state_management_provider_local_auto_persistence_interval;
+constexpr const char *Configure::minifi_disk_space_watchdog_enable;
+constexpr const char *Configure::minifi_disk_space_watchdog_interval;
+constexpr const char *Configure::minifi_disk_space_watchdog_stop_threshold;
+constexpr const char *Configure::minifi_disk_space_watchdog_restart_threshold;
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/src/DiskSpaceWatchdog.cpp b/libminifi/src/DiskSpaceWatchdog.cpp
new file mode 100644
index 0000000..9a7f26c
--- /dev/null
+++ b/libminifi/src/DiskSpaceWatchdog.cpp
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DiskSpaceWatchdog.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "properties/Configure.h"
+#include "utils/file/PathUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+namespace {
+namespace chr = std::chrono;
+
+utils::optional<chr::milliseconds> time_string_to_milliseconds(const std::string& str) {
+ uint64_t millisec_value{};
+ const bool success = core::Property::getTimeMSFromString(str, millisec_value);
+ if (!success) return utils::nullopt;
+ return utils::make_optional(chr::milliseconds{millisec_value});
+}
+
+template<typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
+utils::optional<T> data_size_string_to_int(const std::string& str) {
+ T result{};
+ // actually aware of data units like B, kB, MB, etc.
+ const bool success = core::Property::StringToInt(str, result);
+ if (!success) return utils::nullopt;
+ return utils::make_optional(result);
+}
+
+
+} // namespace
+
+namespace disk_space_watchdog {
+Config read_config(const Configure& conf) {
+ const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::flatMap(time_string_to_milliseconds);
+ const auto stop_bytes = conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
+ const auto restart_bytes = conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
+ if (restart_bytes < stop_bytes) { throw std::runtime_error{"disk space watchdog stop threshold must be <= restart threshold"}; }
+ constexpr auto mebibytes = 1024 * 1024;
+ return {
+ interval_ms.value_or(chr::seconds{15}),
+ stop_bytes.value_or(100 * mebibytes),
+ restart_bytes.value_or(150 * mebibytes)
+ };
+}
+
+
+std::vector<std::uintmax_t> check_available_space(const std::vector<std::string>& paths, core::logging::Logger* logger) {
+ std::vector<std::uintmax_t> result;
+ result.reserve(paths.size());
+ std::transform(std::begin(paths), std::end(paths), std::back_inserter(result), [logger](const std::string& path) {
+ std::error_code ec;
+ const auto result = utils::file::space(path.c_str(), ec);
+ if (ec && logger) {
+ logger->log_info("Couldn't check disk space at %s: %s (ignoring)", path, ec.message());
+ } else if (logger) {
+ logger->log_trace("%s available space: %zu bytes", path, gsl::narrow_cast<size_t>(result.available));
+ }
+ return result.available;
+ });
+ return result;
+}
+
+} // namespace disk_space_watchdog
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 962b397..b3d8dc2 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -52,6 +52,8 @@
#include "core/controller/ControllerServiceProvider.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Connectable.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
#include "utils/HTTPClient.h"
#include "utils/GeneralUtils.h"
#include "io/NetworkPrioritizer.h"
@@ -83,7 +85,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
flow_configuration_(std::move(flow_configuration)),
- configuration_(configure),
+ configuration_(std::move(configure)),
content_repo_(content_repo),
logger_(logging::LoggerFactory<FlowController>::getLogger()) {
if (provenance_repo == nullptr)
@@ -103,25 +105,23 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
c2_initialized_ = false;
root_ = nullptr;
- protocol_ = utils::make_unique<FlowControlProtocol>(this, configure);
+ protocol_ = utils::make_unique<FlowControlProtocol>(this, configuration_);
if (!headless_mode) {
std::string rawConfigFileString;
- configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
+ configuration_->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
if (!rawConfigFileString.empty()) {
configuration_filename_ = rawConfigFileString;
}
- std::string adjustedFilename;
- if (!configuration_filename_.empty()) {
- // perform a naive determination if this is a relative path
- if (configuration_filename_.c_str()[0] != '/') {
- adjustedFilename = adjustedFilename + configure->getHome() + "/" + configuration_filename_;
- } else {
- adjustedFilename = configuration_filename_;
+ const auto adjustedFilename = [&]() -> std::string {
+ if (configuration_filename_.empty()) { return {}; }
+ if (utils::file::isAbsolutePath(configuration_filename_.c_str())) {
+ return configuration_filename_;
}
- }
+ return utils::file::FileUtils::concat_path(configuration_->getHome(), configuration_filename_);
+ }();
initializeExternalComponents();
initializePaths(adjustedFilename);
}
@@ -142,25 +142,20 @@ void FlowController::initializeExternalComponents() {
}
void FlowController::initializePaths(const std::string &adjustedFilename) {
- char *path = NULL;
+ const char *path = nullptr;
#ifndef WIN32
char full_path[PATH_MAX];
path = realpath(adjustedFilename.c_str(), full_path);
#else
- path = const_cast<char*>(adjustedFilename.c_str());
+ path = adjustedFilename.c_str();
#endif
- if (path == NULL) {
+ if (path == nullptr) {
throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
}
std::string pathString(path);
configuration_filename_ = pathString;
logger_->log_info("FlowController NiFi Configuration file %s", pathString);
-
- if (!path) {
- logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", path);
- exit(1);
- }
}
utils::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() {
@@ -200,6 +195,9 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
if (newRoot == nullptr)
return false;
+ if (!isRunning())
+ return false;
+
logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion());
updating_ = true;
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index a0f3893..88f19d5 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -117,7 +117,7 @@ void Properties::loadConfigureFile(const char *fileName) {
return;
}
- properties_file_ = utils::file::PathUtils::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName));
+ properties_file_ = utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName));
logger_->log_info("Using configuration file to load configuration for %s from %s (located at %s)", getName().c_str(), fileName, properties_file_);
diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp
index b02c8e3..792cc79 100644
--- a/libminifi/src/utils/file/FileUtils.cpp
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -29,7 +29,7 @@ namespace minifi {
namespace utils {
namespace file {
-uint64_t FileUtils::computeChecksum(const std::string &file_name, uint64_t up_to_position) {
+uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position) {
constexpr uint64_t BUFFER_SIZE = 4096u;
std::array<char, std::size_t{BUFFER_SIZE}> buffer;
diff --git a/libminifi/src/utils/file/PathUtils.cpp b/libminifi/src/utils/file/PathUtils.cpp
index fd1df13..394c2e2 100644
--- a/libminifi/src/utils/file/PathUtils.cpp
+++ b/libminifi/src/utils/file/PathUtils.cpp
@@ -23,10 +23,12 @@
#else
#include <limits.h>
#include <stdlib.h>
+#include <sys/statvfs.h>
#endif
#include <iostream>
#include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -35,7 +37,7 @@ namespace minifi {
namespace utils {
namespace file {
-bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
+bool getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
const std::size_t found = path.find_last_of(FileUtils::get_separator());
/**
* Don't make an assumption about about the path, return false for this case.
@@ -57,7 +59,7 @@ bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePat
return true;
}
-std::string PathUtils::getFullPath(const std::string& path) {
+std::string getFullPath(const std::string& path) {
#ifdef WIN32
std::vector<char> buffer(MAX_PATH);
uint32_t len = 0U;
@@ -84,13 +86,58 @@ std::string PathUtils::getFullPath(const std::string& path) {
#endif
}
-std::string PathUtils::globToRegex(std::string glob) {
+std::string globToRegex(std::string glob) {
utils::StringUtils::replaceAll(glob, ".", "\\.");
utils::StringUtils::replaceAll(glob, "*", ".*");
utils::StringUtils::replaceAll(glob, "?", ".");
return glob;
}
+space_info space(const path path, std::error_code& ec) noexcept {
+ constexpr auto kErrVal = gsl::narrow_cast<std::uintmax_t>(-1);
+#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__))
+ struct statvfs svfs{};
+ const int statvfs_retval = statvfs(path, &svfs);
+ if (statvfs_retval == -1) {
+ const std::error_code err_code{errno, std::generic_category()};
+ ec = err_code;
+ return space_info{kErrVal, kErrVal, kErrVal};
+ }
+ const auto capacity = std::uintmax_t{svfs.f_blocks} * svfs.f_frsize;
+ const auto free = std::uintmax_t{svfs.f_bfree} * svfs.f_frsize;
+ const auto available = std::uintmax_t{svfs.f_bavail} * svfs.f_frsize;
+#elif defined(_WIN32)
+ ULARGE_INTEGER free_bytes_available_to_caller;
+ ULARGE_INTEGER total_number_of_bytes;
+ ULARGE_INTEGER total_number_of_free_bytes;
+ const bool get_disk_free_space_ex_success = GetDiskFreeSpaceEx(path, &free_bytes_available_to_caller, &total_number_of_bytes,
+ &total_number_of_free_bytes);
+ if (!get_disk_free_space_ex_success) {
+ const std::error_code err_code{gsl::narrow<int>(GetLastError()), std::system_category()};
+ ec = err_code;
+ return space_info{kErrVal, kErrVal, kErrVal};
+ }
+ const auto capacity = total_number_of_bytes.QuadPart;
+ const auto free = total_number_of_free_bytes.QuadPart;
+ const auto available = free_bytes_available_to_caller.QuadPart;
+#else
+ const auto capacity = kErrVal;
+ const auto free = kErrVal;
+ const auto available = kErrVal;
+#endif /* unix || apple */
+
+ return space_info{capacity, free, available};
+}
+
+space_info space(const path path) {
+ std::error_code ec;
+ const auto result = space(path, ec); // const here doesn't break NRVO
+ if (ec) {
+ throw filesystem_error{ec.message(), path, "", ec};
+ }
+ return result;
+}
+
} // namespace file
} // namespace utils
} // namespace minifi
diff --git a/libminifi/test/unit/DiskSpaceWatchdogTests.cpp b/libminifi/test/unit/DiskSpaceWatchdogTests.cpp
new file mode 100644
index 0000000..604b3d6
--- /dev/null
+++ b/libminifi/test/unit/DiskSpaceWatchdogTests.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+
+#include "../TestBase.h"
+
+#include "properties/Configure.h"
+#include "DiskSpaceWatchdog.h"
+
+namespace minifi = org::apache::nifi::minifi;
+namespace dsg = minifi::disk_space_watchdog;
+namespace chr = std::chrono;
+
+TEST_CASE("disk_space_watchdog::read_config", "[dsg::read_config]") {
+ const auto mebibytes = 1024 * 1024;
+
+ SECTION("defaults are present") {
+ const minifi::Configure configure;
+ const auto conf = dsg::read_config(configure);
+ REQUIRE(conf.interval >= chr::nanoseconds{0});
+ REQUIRE(conf.restart_threshold_bytes > conf.stop_threshold_bytes);
+ }
+
+ SECTION("basic") {
+ minifi::Configure configure;
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_stop_threshold, std::to_string(10 * mebibytes));
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_restart_threshold, std::to_string(25 * mebibytes));
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_interval, "2000 millis");
+ const auto conf = dsg::read_config(configure);
+ REQUIRE(conf.stop_threshold_bytes == 10 * mebibytes);
+ REQUIRE(conf.restart_threshold_bytes == 25 * mebibytes);
+ REQUIRE(conf.interval == chr::seconds{2});
+ }
+
+ SECTION("units") {
+ minifi::Configure configure;
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_stop_threshold, "100 MB");
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_restart_threshold, "250 MB");
+ configure.set(minifi::Configure::minifi_disk_space_watchdog_interval, "7 sec");
+ const auto conf = dsg::read_config(configure);
+ REQUIRE(conf.stop_threshold_bytes == 100 * mebibytes);
+ REQUIRE(conf.restart_threshold_bytes == 250 * mebibytes);
+ REQUIRE(conf.interval == chr::seconds{7});
+ }
+}
diff --git a/libminifi/test/unit/EnvironmentUtilsTests.cpp b/libminifi/test/unit/EnvironmentUtilsTests.cpp
index 5700a8e..78606d1 100644
--- a/libminifi/test/unit/EnvironmentUtilsTests.cpp
+++ b/libminifi/test/unit/EnvironmentUtilsTests.cpp
@@ -146,7 +146,7 @@ TEST_CASE("setcwd", "[setcwd]") {
TestController testController;
const std::string cwd = utils::Environment::getCurrentWorkingDirectory();
char format[] = "/tmp/envtest.XXXXXX";
- const std::string tempDir = utils::file::PathUtils::getFullPath(testController.createTempDirectory(format));
+ const std::string tempDir = utils::file::getFullPath(testController.createTempDirectory(format));
REQUIRE(true == utils::Environment::setCurrentWorkingDirectory(tempDir.c_str()));
REQUIRE(tempDir == utils::Environment::getCurrentWorkingDirectory());
REQUIRE(true == utils::Environment::setCurrentWorkingDirectory(cwd.c_str()));
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 6ead1a8..3060299 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -28,7 +28,7 @@
#include "utils/Environment.h"
#include "utils/TimeUtil.h"
-using org::apache::nifi::minifi::utils::file::FileUtils;
+namespace FileUtils = org::apache::nifi::minifi::utils::file;
TEST_CASE("TestFileUtils::concat_path", "[TestConcatPath]") {
std::string child = "baz";
@@ -92,7 +92,7 @@ TEST_CASE("TestFilePath", "[TestGetFileNameAndPath]") {
std::stringstream file;
file << path.str() << FileUtils::get_separator() << "file";
std::string filename, filepath;
- REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(file.str(), filepath, filename) );
+ REQUIRE(true == utils::file::getFileNameAndPath(file.str(), filepath, filename) );
REQUIRE(path.str() == filepath);
REQUIRE("file" == filename);
}
@@ -100,7 +100,7 @@ SECTION("NO FILE VALID PATH") {
std::stringstream path;
path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c" << FileUtils::get_separator();
std::string filename, filepath;
- REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+ REQUIRE(false == utils::file::getFileNameAndPath(path.str(), filepath, filename) );
REQUIRE(filepath.empty());
REQUIRE(filename.empty());
}
@@ -110,14 +110,14 @@ SECTION("FILE NO PATH") {
std::string filename, filepath;
std::string expectedPath;
expectedPath += FileUtils::get_separator();
- REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+ REQUIRE(true == utils::file::getFileNameAndPath(path.str(), filepath, filename) );
REQUIRE(expectedPath == filepath);
REQUIRE("file" == filename);
}
SECTION("NO FILE NO PATH") {
std::string path = "file";
std::string filename, filepath;
- REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+ REQUIRE(false == utils::file::getFileNameAndPath(path, filepath, filename) );
REQUIRE(filepath.empty());
REQUIRE(filename.empty());
}
@@ -154,7 +154,7 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
TestController testController;
char format[] = "/tmp/gt.XXXXXX";
- const std::string tempDir = utils::file::PathUtils::getFullPath(testController.createTempDirectory(format));
+ const std::string tempDir = utils::file::getFullPath(testController.createTempDirectory(format));
const std::string cwd = utils::Environment::getCurrentWorkingDirectory();
@@ -168,15 +168,15 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
REQUIRE(0 == utils::file::FileUtils::create_dir(tempDir1));
REQUIRE(0 == utils::file::FileUtils::create_dir(tempDir2));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(tempDir1));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("test1"));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("./test1"));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("././test1"));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("./test2/../test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath(tempDir1));
+ REQUIRE(tempDir1 == utils::file::getFullPath("test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath("./test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath("././test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath("./test2/../test1"));
#ifdef WIN32
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\test1"));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\.\\test1"));
- REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\test2\\..\\test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath(".\\test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath(".\\.\\test1"));
+ REQUIRE(tempDir1 == utils::file::getFullPath(".\\test2\\..\\test1"));
#endif
}
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp b/libminifi/test/unit/GeneralUtilsTest.cpp
new file mode 100644
index 0000000..f7d12b7
--- /dev/null
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <functional>
+#include <string>
+#include <type_traits>
+
+#include "../TestBase.h"
+#include "utils/GeneralUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+static_assert(std::is_same<decltype(utils::make_unique<char16_t>()), std::unique_ptr<char16_t>>::value, "utils::make_unique type must be correct");
+
+TEST_CASE("GeneralUtils::make_unique", "[make_unique]") {
+ const auto pstr = utils::make_unique<std::string>("test string");
+ REQUIRE("test string" == *pstr);
+}
+
+TEST_CASE("GeneralUtils::intdiv_ceil", "[intdiv_ceil]") {
+ REQUIRE(0 == utils::intdiv_ceil(0, 1));
+ REQUIRE(0 == utils::intdiv_ceil(0, 2));
+ REQUIRE(1 == utils::intdiv_ceil(1, 2));
+ REQUIRE(1 == utils::intdiv_ceil(1, 3));
+ REQUIRE(1 == utils::intdiv_ceil(3, 3));
+ REQUIRE(2 == utils::intdiv_ceil(4, 3));
+ REQUIRE(2 == utils::intdiv_ceil(4, 3));
+ REQUIRE(0 == utils::intdiv_ceil(-1, 3));
+ REQUIRE(-1 == utils::intdiv_ceil(-3, 3));
+ REQUIRE(-1 == utils::intdiv_ceil(-4, 3));
+}
+
+TEST_CASE("GeneralUtils::exchange", "[exchange]") {
+ int a = 1;
+ int b = 2;
+ a = utils::exchange(b, 0);
+ REQUIRE(2 == a);
+ REQUIRE(0 == b);
+}
+
+static_assert(std::is_same<decltype(utils::void_t<char16_t>()), void>::value, "utils::void_t single arg must work");
+static_assert(std::is_same<decltype(utils::void_t<int, double, bool, void, char16_t>()), void>::value, "utils::void_t multi arg must work");
+
+TEST_CASE("GeneralUtils::invoke pointer to member function", "[invoke memfnptr]") {
+ const int result{0xc1ca};
+ struct Tester {
+ bool called{};
+ int memfn(const int arg) {
+ REQUIRE(42 == arg);
+ called = true;
+ return result;
+ }
+ };
+
+ // normal
+ REQUIRE(result == utils::invoke(&Tester::memfn, Tester{}, 42));
+
+ // reference_wrapper
+ Tester t2;
+ const auto ref_wrapper = std::ref(t2);
+ REQUIRE(result == utils::invoke(&Tester::memfn, ref_wrapper, 42));
+ REQUIRE(t2.called);
+
+ // pointer
+ Tester t3;
+ REQUIRE(result == utils::invoke(&Tester::memfn, &t3, 42));
+ REQUIRE(t3.called);
+}
+
+TEST_CASE("GeneralUtils::invoke pointer to data member", "[invoke data member]") {
+ struct Times2 {
+ int value;
+ explicit Times2(const int i) :value{i * 2} {}
+ };
+
+ // normal
+ REQUIRE(24 == utils::invoke(&Times2::value, Times2{12}));
+
+ // reference_wrapper
+ Times2 t2{42};
+ const auto ref_wrapper = std::ref(t2);
+ REQUIRE(84 == utils::invoke(&Times2::value, ref_wrapper));
+
+ // pointer
+ Times2 t3{0};
+ REQUIRE(0 == utils::invoke(&Times2::value, &t3));
+}
+
+namespace {
+bool free_function(const bool b) { return b; }
+} // namespace
+
+TEST_CASE("GeneralUtils::invoke FunctionObject", "[invoke function object]") {
+ REQUIRE(true == utils::invoke(&free_function, true));
+ REQUIRE(false == utils::invoke(&free_function, false));
+
+ const auto n = 3;
+ const auto int_timesn = [n](const int i) { return n * i; };
+
+ // lambda with capture
+ REQUIRE(60 == utils::invoke(int_timesn, 20));
+}
diff --git a/libminifi/test/unit/IntervalSwitchTest.cpp b/libminifi/test/unit/IntervalSwitchTest.cpp
new file mode 100644
index 0000000..5bd0e62
--- /dev/null
+++ b/libminifi/test/unit/IntervalSwitchTest.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "utils/IntervalSwitch.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+namespace {
+using State = utils::IntervalSwitchState;
+
+struct expected {
+ expected(State state, bool switched) :state{state}, switched{switched} {}
+
+ State state;
+ bool switched;
+};
+
+template<typename IntervalSwitchReturn>
+bool operator==(const IntervalSwitchReturn& lhs, const expected& c) {
+ return lhs.state == c.state && lhs.switched == c.switched;
+}
+} // namespace
+
+TEST_CASE("basic IntervalSwitch", "[intervalswitch.basic]") {
+ utils::IntervalSwitch<int> interval_switch{100, 150};
+ REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(99) == expected(State::LOWER, true));
+ REQUIRE(interval_switch(-20022202) == expected(State::LOWER, false));
+ REQUIRE(interval_switch(120) == expected(State::LOWER, false));
+ REQUIRE(interval_switch(149) == expected(State::LOWER, false));
+ REQUIRE(interval_switch(150) == expected(State::UPPER, true));
+ REQUIRE(interval_switch(150) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(2000) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+}
+
+TEST_CASE("IntervalSwitch comparator", "[intervalswitch.comp]") {
+ utils::IntervalSwitch<uint32_t, std::greater<uint32_t>> interval_switch{250, 100};
+ REQUIRE(interval_switch(99) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(250) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(251) == expected(State::LOWER, true));
+ REQUIRE(interval_switch(150) == expected(State::LOWER, false));
+ REQUIRE(interval_switch(101) == expected(State::LOWER, false));
+ REQUIRE(interval_switch(100) == expected(State::UPPER, true));
+ REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+ REQUIRE(interval_switch(250) == expected(State::UPPER, false));
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp b/libminifi/test/unit/OptionalTest.cpp
new file mode 100644
index 0000000..311afec
--- /dev/null
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "utils/OptionalUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+TEST_CASE("optional map", "[optional map]") {
+ const auto test1 = utils::make_optional(6) | utils::map([](const int i) { return i * 2; });
+ REQUIRE(12 == test1.value());
+
+ const auto test2 = utils::optional<int>{} | utils::map([](const int i) { return i * 2; });
+ REQUIRE(!test2.has_value());
+}
+
+TEST_CASE("optional flatMap", "[optional flat map]") {
+ const auto make_intdiv_noremainder = [](const int denom) {
+ return [denom](const int num) { return num % denom == 0 ? utils::make_optional(num / denom) : utils::optional<int>{}; };
+ };
+
+ const auto test1 = utils::make_optional(6) | utils::flatMap(make_intdiv_noremainder(3));
+ REQUIRE(2 == test1.value());
+
+ const auto const_lval_func = make_intdiv_noremainder(4);
+ const auto test2 = utils::optional<int>{} | utils::flatMap(const_lval_func);
+ REQUIRE(!test2.has_value());
+
+ auto mutable_lval_func = make_intdiv_noremainder(3);
+ const auto test3 = utils::make_optional(7) | utils::flatMap(mutable_lval_func);
+ REQUIRE(!test3.has_value());
+}
diff --git a/libminifi/test/unit/PathUtilsTests.cpp b/libminifi/test/unit/PathUtilsTests.cpp
index 30cbb78..b907e1f 100644
--- a/libminifi/test/unit/PathUtilsTests.cpp
+++ b/libminifi/test/unit/PathUtilsTests.cpp
@@ -19,14 +19,32 @@
#include <catch.hpp>
#include "utils/file/PathUtils.h"
-using namespace org::apache::nifi::minifi::utils::file;
+namespace fs = org::apache::nifi::minifi::utils::file;
-TEST_CASE("PathUtils::globToRegex works", "[globToRegex]") {
- REQUIRE(PathUtils::globToRegex("") == "");
- REQUIRE(PathUtils::globToRegex("NoSpecialChars") == "NoSpecialChars");
- REQUIRE(PathUtils::globToRegex("ReplaceDot.txt") == "ReplaceDot\\.txt");
- REQUIRE(PathUtils::globToRegex("Replace.Multiple.Dots...txt") == "Replace\\.Multiple\\.Dots\\.\\.\\.txt");
- REQUIRE(PathUtils::globToRegex("ReplaceAsterisk.*") == "ReplaceAsterisk\\..*");
- REQUIRE(PathUtils::globToRegex("Replace*Multiple*Asterisks") == "Replace.*Multiple.*Asterisks");
- REQUIRE(PathUtils::globToRegex("ReplaceQuestionMark?.txt") == "ReplaceQuestionMark.\\.txt");
+TEST_CASE("file::globToRegex works", "[globToRegex]") {
+ REQUIRE(fs::globToRegex("").empty());
+ REQUIRE(fs::globToRegex("NoSpecialChars") == "NoSpecialChars");
+ REQUIRE(fs::globToRegex("ReplaceDot.txt") == "ReplaceDot\\.txt");
+ REQUIRE(fs::globToRegex("Replace.Multiple.Dots...txt") == "Replace\\.Multiple\\.Dots\\.\\.\\.txt");
+ REQUIRE(fs::globToRegex("ReplaceAsterisk.*") == "ReplaceAsterisk\\..*");
+ REQUIRE(fs::globToRegex("Replace*Multiple*Asterisks") == "Replace.*Multiple.*Asterisks");
+ REQUIRE(fs::globToRegex("ReplaceQuestionMark?.txt") == "ReplaceQuestionMark.\\.txt");
+}
+
+TEST_CASE("path::isAbsolutePath", "[path::isAbsolutePath]") {
+#ifdef WIN32
+ REQUIRE(fs::isAbsolutePath("C:\\"));
+ REQUIRE(fs::isAbsolutePath("C:\\Program Files"));
+ REQUIRE(fs::isAbsolutePath("C:\\Program Files\\ApacheNiFiMiNiFi\\nifi-minifi-cpp\\conf\\minifi.properties"));
+ REQUIRE(fs::isAbsolutePath("C:/"));
+ REQUIRE(fs::isAbsolutePath("C:/Program Files"));
+ REQUIRE(fs::isAbsolutePath("C:/Program Files/ApacheNiFiMiNiFi/nifi-minifi-cpp/conf/minifi.properties"));
+ REQUIRE(!fs::isAbsolutePath("/"));
+#else
+ REQUIRE(fs::isAbsolutePath("/"));
+ REQUIRE(fs::isAbsolutePath("/etc"));
+ REQUIRE(fs::isAbsolutePath("/opt/minifi/conf/minifi.properties"));
+#endif /* WIN32 */
+
+ REQUIRE(!fs::isAbsolutePath("hello"));
}
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 3dba436..d90a37f 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -53,6 +53,7 @@
#include "core/FlowConfiguration.h"
#include "core/ConfigurationFactory.h"
#include "core/RepositoryFactory.h"
+#include "DiskSpaceWatchdog.h"
#include "utils/file/PathUtils.h"
#include "utils/file/FileUtils.h"
#include "utils/Environment.h"
@@ -142,7 +143,7 @@ int main(int argc, char **argv) {
// initialize static functions that were defined apriori
core::FlowConfiguration::initialize_static_functions();
- std::string graceful_shutdown_seconds = "";
+ std::string graceful_shutdown_seconds;
std::string prov_repo_class = "provenancerepository";
std::string flow_repo_class = "flowfilerepository";
std::string nifi_configuration_class_name = "yamlconfiguration";
@@ -204,7 +205,7 @@ int main(int argc, char **argv) {
// Make a record of minifi home in the configured log file.
logger->log_info("MINIFI_HOME=%s", minifiHome);
- std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>();
+ const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>();
configure->setHome(minifiHome);
configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
@@ -286,8 +287,53 @@ int main(int argc, char **argv) {
std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name);
- std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>(
- new minifi::FlowController(prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo));
+ const auto controller = std::make_shared<minifi::FlowController>(prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo);
+
+ const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v){ return v == "true"; })).value_or(true);
+ std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
+ if (disk_space_watchdog_enable) {
+ try {
+ const auto repo_paths = [&] {
+ std::vector<std::string> repo_paths;
+ repo_paths.reserve(3);
+ // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
+ const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; };
+ auto prov_repo_path = prov_repo->getDirectory();
+ auto flow_repo_path = flow_repo->getDirectory();
+ auto content_repo_storage_path = content_repo->getStoragePath();
+ if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); }
+ if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); }
+ if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); }
+ return repo_paths;
+ }();
+ const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
+ const auto config = minifi::disk_space_watchdog::read_config(*configure);
+ const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
+ const auto it = std::min_element(std::begin(spaces), std::end(spaces));
+ return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)();
+ };
+ if (min_space(available_spaces) <= config.stop_threshold_bytes) {
+ logger->log_error("Cannot start MiNiFi due to insufficient available disk space");
+ return -1;
+ }
+ auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config);
+ disk_space_watchdog = utils::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable {
+ const auto stop = [&]{ controller->stop(); controller->unload(); };
+ const auto restart = [&]{ controller->load(); controller->start(); };
+ const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get())));
+ if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) {
+ logger->log_warn("Stopping flow controller due to insufficient disk space");
+ stop();
+ } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) {
+ logger->log_info("Restarting flow controller");
+ restart();
+ }
+ });
+ } catch (const std::runtime_error& error) {
+ logger->log_error(error.what());
+ return -1;
+ }
+ }
logger->log_info("Loading FlowController");
@@ -305,8 +351,10 @@ int main(int argc, char **argv) {
}
// Start Processing the flow
-
controller->start();
+
+ if (disk_space_watchdog) { disk_space_watchdog->start(); }
+
logger->log_info("MiNiFi started");
/**
@@ -324,6 +372,8 @@ int main(int argc, char **argv) {
while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR);
if(ret_val == -1) perror("sem_unlink");
+ disk_space_watchdog = nullptr;
+
/**
* Trigger unload -- wait stop_wait_time
*/