You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/10/01 10:47:07 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1332 Prevent errneous behavior by stopping FlowController in low disk space conditions

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b0be47f  MINIFICPP-1332 Prevent errneous behavior by stopping FlowController in low disk space conditions
b0be47f is described below

commit b0be47f354ab092f368cfce4ec4723fba3bcf4ea
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Aug 19 13:00:46 2020 +0200

    MINIFICPP-1332 Prevent errneous behavior by stopping FlowController in low disk space conditions
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #875
---
 conf/minifi.properties                             |   9 +
 .../standard-processors/processors/TailFile.cpp    |  10 +-
 .../tests/unit/RetryFlowFileTests.cpp              |   2 +-
 .../tests/unit/TailFileTests.cpp                   |   8 +-
 .../{utils/CallBackTimer.h => DiskSpaceWatchdog.h} |  57 +-
 libminifi/include/core/Repository.h                |   4 +
 libminifi/include/properties/Configure.h           |  98 +--
 libminifi/include/properties/Properties.h          |  12 +
 libminifi/include/utils/CallBackTimer.h            |   1 -
 libminifi/include/utils/GeneralUtils.h             |  69 ++
 .../utils/{CallBackTimer.h => IntervalSwitch.h}    |  61 +-
 libminifi/include/utils/OptionalUtils.h            |  50 ++
 libminifi/include/utils/file/FileUtils.h           | 896 ++++++++++-----------
 libminifi/include/utils/file/PathUtils.h           |  58 +-
 libminifi/src/Configure.cpp                        |  94 +--
 libminifi/src/DiskSpaceWatchdog.cpp                |  87 ++
 libminifi/src/FlowController.cpp                   |  36 +-
 libminifi/src/Properties.cpp                       |   2 +-
 libminifi/src/utils/file/FileUtils.cpp             |   2 +-
 libminifi/src/utils/file/PathUtils.cpp             |  53 +-
 libminifi/test/unit/DiskSpaceWatchdogTests.cpp     |  61 ++
 libminifi/test/unit/EnvironmentUtilsTests.cpp      |   2 +-
 libminifi/test/unit/FileUtilsTests.cpp             |  28 +-
 libminifi/test/unit/GeneralUtilsTest.cpp           | 117 +++
 libminifi/test/unit/IntervalSwitchTest.cpp         |  66 ++
 libminifi/test/unit/OptionalTest.cpp               |  47 ++
 libminifi/test/unit/PathUtilsTests.cpp             |  36 +-
 main/MiNiFiMain.cpp                                |  60 +-
 28 files changed, 1365 insertions(+), 661 deletions(-)

diff --git a/conf/minifi.properties b/conf/minifi.properties
index dfa980f..291d1b2 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -49,6 +49,15 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
 ## To change the frequency at which the default state storage is persisted, modify the following
 #nifi.state.management.provider.local.auto.persistence.interval=1 min
 
+# Disk space watchdog #
+## Stops MiNiFi FlowController activity (excluding C2), when the available disk space on either of the repository
+## volumes go below stop.threshold, checked every interval, then restarts when the available space on all
+## repository volumes reach at least restart.threshold.
+#minifi.disk.space.watchdog.enable=true
+#minifi.disk.space.watchdog.interval=15 sec
+#minifi.disk.space.watchdog.stop.threshold=100 MB
+#minifi.disk.space.watchdog.restart.threshold=150 MB
+
 ## Enabling C2 Uncomment each of the following options
 ## define those with missing options
 #nifi.c2.enable=true
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 35a2ce9..46bd952 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -381,7 +381,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
     tail_mode_ = Mode::SINGLE;
 
     std::string path, file_name;
-    if (utils::file::PathUtils::getFileNameAndPath(file_to_tail_, path, file_name)) {
+    if (utils::file::getFileNameAndPath(file_to_tail_, path, file_name)) {
       // NOTE: position and checksum will be updated in recoverState() if there is a persisted state for this file
       tail_states_.emplace(file_to_tail_, TailState{path, file_name});
     } else {
@@ -393,7 +393,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
 
   std::string rolling_filename_pattern_glob;
   context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
-  rolling_filename_pattern_ = utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
+  rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
 }
 
 void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
@@ -432,7 +432,7 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &s
 
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
-    if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+    if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
       logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
       state.emplace(fileName, TailState{fileLocation, fileName});
     } else {
@@ -451,7 +451,7 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &s
   if (key.find(CURRENT_STR) == 0) {
     const auto file = key.substr(strlen(CURRENT_STR));
     std::string fileLocation, fileName;
-    if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+    if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
       state[file].path_ = fileLocation;
       state[file].file_name_ = fileName;
     } else {
@@ -513,7 +513,7 @@ bool TailFile::getStateFromStateManager(std::map<std::string, TailState> &new_ta
         }};
 
         std::string fileLocation, fileName;
-        if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) {
+        if (utils::file::getFileNameAndPath(current, fileLocation, fileName)) {
           logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
           new_tail_states.emplace(current, TailState{fileLocation, fileName, position, last_read_time, checksum});
         } else {
diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
index 133618f..0e6913a 100644
--- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
@@ -37,7 +37,7 @@
 namespace {
 using org::apache::nifi::minifi::utils::createTempDir;
 using org::apache::nifi::minifi::utils::optional;
-using org::apache::nifi::minifi::utils::file::FileUtils;
+namespace FileUtils = org::apache::nifi::minifi::utils::file;
 
 class RetryFlowFileTest {
  public:
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 5706fbc..2c57c7d 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -239,7 +239,7 @@ TEST_CASE("TailFile picks up the state correctly if it is rewritten between runs
   REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
 
   std::string filePath, fileName;
-  REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file.str(), filePath, fileName));
+  REQUIRE(utils::file::getFileNameAndPath(temp_file.str(), filePath, fileName));
 
   // should stay the same
   for (int i = 0; i < 5; i++) {
@@ -321,7 +321,7 @@ TEST_CASE("TailFile converts the old-style state file to the new-style state", "
     REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
 
     std::string filePath, fileName;
-    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file, filePath, fileName));
+    REQUIRE(utils::file::getFileNameAndPath(temp_file, filePath, fileName));
     std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName},
                                                                 {"file.0.position", "35"},
                                                                 {"file.0.current", temp_file},
@@ -365,8 +365,8 @@ TEST_CASE("TailFile converts the old-style state file to the new-style state", "
     REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
 
     std::string filePath1, filePath2, fileName1, fileName2;
-    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_1, filePath1, fileName1));
-    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_2, filePath2, fileName2));
+    REQUIRE(utils::file::getFileNameAndPath(temp_file_1, filePath1, fileName1));
+    REQUIRE(utils::file::getFileNameAndPath(temp_file_2, filePath2, fileName2));
     std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName1},
                                                                 {"file.0.position", "35"},
                                                                 {"file.0.current", temp_file_1},
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/DiskSpaceWatchdog.h
similarity index 52%
copy from libminifi/include/utils/CallBackTimer.h
copy to libminifi/include/DiskSpaceWatchdog.h
index faf25b3..384a9fe 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/DiskSpaceWatchdog.h
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,49 +16,45 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-#define LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
+#pragma once
 
-#include <mutex>
-#include <condition_variable>
-#include <thread>
 #include <chrono>
-#include <functional>
+#include <cinttypes>
+#include <string>
+#include <vector>
+
+#include "utils/IntervalSwitch.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace utils {
-
-class CallBackTimer {
- public:
-  CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
-  ~CallBackTimer();
-
-  void stop();
 
-  void start();
+class Configure;
+namespace core {
+namespace logging {
+class Logger;
+}  // namespace logging
+}  // namespace core
+
+namespace disk_space_watchdog {
+struct Config {
+  std::chrono::milliseconds interval;
+  std::uintmax_t stop_threshold_bytes;
+  std::uintmax_t restart_threshold_bytes;
+};
 
-  bool is_running() const;
+Config read_config(const Configure&);
 
- private:
-  bool execute_;
-  std::function<void(void)> func_;
-  std::thread thd_;
-  mutable std::mutex mtx_;
-  mutable std::mutex cv_mtx_;
-  std::condition_variable cv_;
+inline utils::IntervalSwitch<std::uintmax_t> disk_space_interval_switch(Config config) {
+  return {config.stop_threshold_bytes, config.restart_threshold_bytes, utils::IntervalSwitchState::UPPER};
+}
 
-  const std::chrono::milliseconds interval_;
-};
+// Esentially `paths | transform(utils::file::space) | transform(&utils::file::space_info::available)` with error logging
+std::vector<std::uintmax_t> check_available_space(const std::vector<std::string>& paths, core::logging::Logger* logger = nullptr);
 
-}  // namespace utils
+}  // namespace disk_space_watchdog
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index 2b7d7aa..194937c 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -225,6 +225,10 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
 
   virtual uint64_t getRepoSize();
 
+  std::string getDirectory() const {
+    return directory_;
+  }
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Repository(const Repository &parent) = delete;
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 7a298f2..e1ad1f5 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -53,59 +53,63 @@ class Configure : public Properties {
   }
 
   // nifi.flow.configuration.file
-  static const char *nifi_default_directory;
-  static const char *nifi_flow_configuration_file;
-  static const char *nifi_flow_configuration_file_exit_failure;
-  static const char *nifi_flow_configuration_file_backup_update;
-  static const char *nifi_flow_engine_threads;
-  static const char *nifi_flow_engine_alert_period;
-  static const char *nifi_flow_engine_event_driven_time_slice;
-  static const char *nifi_administrative_yield_duration;
-  static const char *nifi_bored_yield_duration;
-  static const char *nifi_graceful_shutdown_seconds;
-  static const char *nifi_flowcontroller_drain_timeout;
-  static const char *nifi_log_level;
-  static const char *nifi_server_name;
-  static const char *nifi_configuration_class_name;
-  static const char *nifi_flow_repository_class_name;
-  static const char *nifi_content_repository_class_name;
-  static const char *nifi_volatile_repository_options;
-  static const char *nifi_provenance_repository_class_name;
-  static const char *nifi_server_port;
-  static const char *nifi_server_report_interval;
-  static const char *nifi_provenance_repository_max_storage_time;
-  static const char *nifi_provenance_repository_max_storage_size;
-  static const char *nifi_provenance_repository_directory_default;
-  static const char *nifi_provenance_repository_enable;
-  static const char *nifi_flowfile_repository_max_storage_time;
-  static const char *nifi_dbcontent_repository_directory_default;
-  static const char *nifi_flowfile_repository_max_storage_size;
-  static const char *nifi_flowfile_repository_directory_default;
-  static const char *nifi_flowfile_repository_enable;
-  static const char *nifi_remote_input_secure;
-  static const char *nifi_remote_input_http;
-  static const char *nifi_security_need_ClientAuth;
+  static constexpr const char *nifi_default_directory = "nifi.default.directory";
+  static constexpr const char *nifi_flow_configuration_file = "nifi.flow.configuration.file";
+  static constexpr const char *nifi_flow_configuration_file_exit_failure = "nifi.flow.configuration.file.exit.onfailure";
+  static constexpr const char *nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
+  static constexpr const char *nifi_flow_engine_threads = "nifi.flow.engine.threads";
+  static constexpr const char *nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
+  static constexpr const char *nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
+  static constexpr const char *nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
+  static constexpr const char *nifi_bored_yield_duration = "nifi.bored.yield.duration";
+  static constexpr const char *nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
+  static constexpr const char *nifi_flowcontroller_drain_timeout = "nifi.flowcontroller.drain.timeout";
+  static constexpr const char *nifi_log_level = "nifi.log.level";
+  static constexpr const char *nifi_server_name = "nifi.server.name";
+  static constexpr const char *nifi_configuration_class_name = "nifi.flow.configuration.class.name";
+  static constexpr const char *nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name";
+  static constexpr const char *nifi_content_repository_class_name = "nifi.content.repository.class.name";
+  static constexpr const char *nifi_volatile_repository_options = "nifi.volatile.repository.options.";
+  static constexpr const char *nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
+  static constexpr const char *nifi_server_port = "nifi.server.port";
+  static constexpr const char *nifi_server_report_interval = "nifi.server.report.interval";
+  static constexpr const char *nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
+  static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
+  static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
+  static constexpr const char *nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
+  static constexpr const char *nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
+  static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
+  static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
+  static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
+  static constexpr const char *nifi_remote_input_http = "nifi.remote.input.http.enabled";
+  static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
   // site2site security config
-  static const char *nifi_security_client_certificate;
-  static const char *nifi_security_client_private_key;
-  static const char *nifi_security_client_pass_phrase;
-  static const char *nifi_security_client_ca_certificate;
+  static constexpr const char *nifi_security_client_certificate = "nifi.security.client.certificate";
+  static constexpr const char *nifi_security_client_private_key = "nifi.security.client.private.key";
+  static constexpr const char *nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
+  static constexpr const char *nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
 
   // nifi rest api user name and password
-  static const char *nifi_rest_api_user_name;
-  static const char *nifi_rest_api_password;
+  static constexpr const char *nifi_rest_api_user_name = "nifi.rest.api.user.name";
+  static constexpr const char *nifi_rest_api_password = "nifi.rest.api.password";
   // c2 options
-  static const char *nifi_c2_enable;
-  static const char *nifi_c2_file_watch;
-  static const char *nifi_c2_flow_id;
-  static const char *nifi_c2_flow_url;
-  static const char *nifi_c2_flow_base_url;
-  static const char *nifi_c2_full_heartbeat;
+  static constexpr const char *nifi_c2_enable = "nifi.c2.enable";
+  static constexpr const char *nifi_c2_file_watch = "nifi.c2.file.watch";
+  static constexpr const char *nifi_c2_flow_id = "nifi.c2.flow.id";
+  static constexpr const char *nifi_c2_flow_url = "nifi.c2.flow.url";
+  static constexpr const char *nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
+  static constexpr const char *nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
 
   // state management options
-  static const char *nifi_state_management_provider_local;
-  static const char *nifi_state_management_provider_local_always_persist;
-  static const char *nifi_state_management_provider_local_auto_persistence_interval;
+  static constexpr const char *nifi_state_management_provider_local = "nifi.state.management.provider.local";
+  static constexpr const char *nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
+  static constexpr const char *nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
+
+  // disk space watchdog options
+  static constexpr const char *minifi_disk_space_watchdog_enable = "minifi.disk.space.watchdog.enable";
+  static constexpr const char *minifi_disk_space_watchdog_interval = "minifi.disk.space.watchdog.interval";
+  static constexpr const char *minifi_disk_space_watchdog_stop_threshold = "minifi.disk.space.watchdog.stop.threshold";
+  static constexpr const char *minifi_disk_space_watchdog_restart_threshold = "minifi.disk.space.watchdog.restart.threshold";
 
  private:
   std::string agent_identifier_;
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index e814be8..94d2bca 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -24,7 +24,9 @@
 #include <vector>
 #include <string>
 #include <map>
+
 #include "core/logging/Logger.h"
+#include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
@@ -82,6 +84,16 @@ class Properties {
    */
   int getInt(const std::string &key, int default_value) const;
 
+  utils::optional<std::string> get(const std::string& key) const {
+    std::string result;
+    const bool found = get(key, result);
+    if (found) {
+      return result;
+    } else {
+      return utils::nullopt;
+    }
+  }
+
   // Parse one line in configure file like key=value
   bool parseConfigureFileLine(char *buf, std::string &prop_key, std::string &prop_value);
 
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/utils/CallBackTimer.h
index faf25b3..c5fe96b 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/utils/CallBackTimer.h
@@ -33,7 +33,6 @@ namespace utils {
 class CallBackTimer {
  public:
   CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
   ~CallBackTimer();
 
   void stop();
diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index ee91627..224f455 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -43,6 +43,7 @@ using std::make_unique;
 
 template<typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
 T intdiv_ceil(T numerator, T denominator) {
+  gsl_Expects(denominator != 0);
   // note: division and remainder is 1 instruction on x86
   return numerator / denominator + (numerator % denominator > 0);
 }
@@ -92,6 +93,74 @@ struct EnableSharedFromThis : virtual internal::EnableSharedFromThisBase {
   }
 };
 
+// utilities to define single expression functions with proper noexcept and a decltype-ed return type (like decltype(auto) since C++14)
+#define MINIFICPP_UTIL_DEDUCED(...) noexcept(noexcept(__VA_ARGS__)) -> decltype(__VA_ARGS__) { return __VA_ARGS__; }
+#define MINIFICPP_UTIL_DEDUCED_CONDITIONAL(condition, ...) noexcept(noexcept(__VA_ARGS__)) -> typename std::enable_if<(condition), decltype(__VA_ARGS__)>::type { return __VA_ARGS__; }
+
+#if __cplusplus < 201703L
+namespace detail {
+template<typename>
+struct is_reference_wrapper : std::false_type {};
+
+template<typename T>
+struct is_reference_wrapper<std::reference_wrapper<T>> : std::true_type {};
+
+// invoke on pointer to member function
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ (std::forward<Obj>(obj).*f)(std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ (std::forward<Obj>(obj).get().*f)(std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_member_function_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && !is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ ((*std::forward<Obj>(obj)).*f)(std::forward<Args>(args)...))
+
+// invoke on pointer to data member
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ std::forward<Obj>(obj).*f)
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ std::forward<Obj>(obj).get().*f)
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_member_object_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ (!std::is_base_of<Clazz, typename std::decay<decltype(obj)>::type>::value && !is_reference_wrapper<typename std::decay<decltype(obj)>::type>::value),
+    /* expr: */ (*std::forward<Obj>(obj)).*f)
+
+// invoke_impl
+template<typename T, typename Clazz, typename Obj, typename... Args>
+auto invoke_impl(T Clazz::*f, Obj&& obj, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ std::is_member_function_pointer<decltype(f)>::value,
+    /* expr: */ invoke_member_function_impl(f, std::forward<Obj>(obj), std::forward<Args>(args)...))
+
+template<typename T, typename Clazz, typename Obj>
+auto invoke_impl(T Clazz::*f, Obj&& obj) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ std::is_member_object_pointer<decltype(f)>::value,
+    /* expr: */ invoke_member_object_impl(f, std::forward<Obj>(obj)))
+
+template<typename F, typename... Args>
+auto invoke_impl(F&& f, Args&&... args) MINIFICPP_UTIL_DEDUCED_CONDITIONAL(
+    /* cond: */ !std::is_member_function_pointer<F>::value && !std::is_member_object_pointer<F>::value,
+    /* expr: */ std::forward<F>(f)(std::forward<Args>(args)...))
+
+}  // namespace detail
+
+template<typename F, typename... Args>
+auto invoke(F&& f, Args&&... args) MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(std::forward<F>(f), std::forward<Args>(args)...))
+#else
+using std::invoke
+#endif /* < C++17 */
+
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/utils/IntervalSwitch.h
similarity index 50%
copy from libminifi/include/utils/CallBackTimer.h
copy to libminifi/include/utils/IntervalSwitch.h
index faf25b3..0052b4e 100644
--- a/libminifi/include/utils/CallBackTimer.h
+++ b/libminifi/include/utils/IntervalSwitch.h
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,14 +16,12 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-#define LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
+#pragma once
 
-#include <mutex>
-#include <condition_variable>
-#include <thread>
-#include <chrono>
 #include <functional>
+#include <utility>
+
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -30,27 +29,42 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-class CallBackTimer {
- public:
-  CallBackTimer(std::chrono::milliseconds interval, const std::function<void(void)>& func);
-
-  ~CallBackTimer();
+enum class IntervalSwitchState {
+  LOWER,
+  UPPER,
+};
 
-  void stop();
+namespace detail {
+struct SwitchReturn {
+  IntervalSwitchState state;
+  bool switched;
+};
+}  // namespace detail
 
-  void start();
+template<typename T, typename Comp = std::less<T>>
+class IntervalSwitch {
+ public:
+  IntervalSwitch(T lower_threshold, T upper_threshold, const IntervalSwitchState initial_state = IntervalSwitchState::UPPER)
+      :lower_threshold_{std::move(lower_threshold)}, upper_threshold_{std::move(upper_threshold)}, state_{initial_state} {
+    gsl_Expects(!less_(upper_threshold_, lower_threshold_));
+  }
 
-  bool is_running() const;
+  detail::SwitchReturn operator()(const T& value) {
+    const auto old_state = state_;
+    if (less_(value, lower_threshold_)) {
+      state_ = IntervalSwitchState::LOWER;
+    } else if (!less_(value, upper_threshold_)) {
+      state_ = IntervalSwitchState::UPPER;
+    }
+    return {state_, state_ != old_state};
+  }
 
  private:
-  bool execute_;
-  std::function<void(void)> func_;
-  std::thread thd_;
-  mutable std::mutex mtx_;
-  mutable std::mutex cv_mtx_;
-  std::condition_variable cv_;
-
-  const std::chrono::milliseconds interval_;
+  T lower_threshold_;
+  T upper_threshold_;
+  Comp less_;
+
+  IntervalSwitchState state_;
 };
 
 }  // namespace utils
@@ -58,6 +72,3 @@ class CallBackTimer {
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_
-
diff --git a/libminifi/include/utils/OptionalUtils.h b/libminifi/include/utils/OptionalUtils.h
index 3810238..905128c 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -18,9 +18,11 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
 #define LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
 
+#include <type_traits>
 #include <utility>
 
 #include <nonstd/optional.hpp>
+#include "utils/GeneralUtils.h"
 #include "utils/gsl.h"
 
 namespace org {
@@ -37,6 +39,54 @@ optional<typename gsl_lite::remove_cvref<T>::type> optional_from_ptr(T&& obj) {
   return obj == nullptr ? nullopt : optional<typename gsl_lite::remove_cvref<T>::type>{ std::forward<T>(obj) };
 }
 
+template<typename>
+struct is_optional : std::false_type {};
+
+template<typename T>
+struct is_optional<optional<T>> : std::true_type {};
+
+namespace detail {
+template<typename T>
+struct map_wrapper {
+  T function;
+};
+
+// map implementation
+template<typename SourceType, typename F>
+auto operator|(const optional<SourceType>& o, map_wrapper<F> f) noexcept(noexcept(utils::invoke(std::forward<F>(f.function), *o)))
+    -> optional<typename std::decay<decltype(utils::invoke(std::forward<F>(f.function), *o))>::type> {
+  if (o.has_value()) {
+    return make_optional(utils::invoke(std::forward<F>(f.function), *o));
+  } else {
+    return nullopt;
+  }
+}
+
+template<typename T>
+struct flat_map_wrapper {
+  T function;
+};
+
+// flatMap implementation
+template<typename SourceType, typename F>
+auto operator|(const optional<SourceType>& o, flat_map_wrapper<F> f) noexcept(noexcept(utils::invoke(std::forward<F>(f.function), *o)))
+    -> optional<typename std::decay<decltype(*utils::invoke(std::forward<F>(f.function), *o))>::type> {
+  static_assert(is_optional<decltype(utils::invoke(std::forward<F>(f.function), *o))>::value, "flatMap expects a function returning optional");
+  if (o.has_value()) {
+    return utils::invoke(std::forward<F>(f.function), *o);
+  } else {
+    return nullopt;
+  }
+}
+
+}  // namespace detail
+
+template<typename T>
+detail::map_wrapper<T&&> map(T&& func) noexcept { return {std::forward<T>(func)}; }
+
+template<typename T>
+detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return {std::forward<T>(func)}; }
+
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 7c90d43..be022eb 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -93,478 +93,479 @@ namespace minifi {
 namespace utils {
 namespace file {
 
-/**
- * Simple implementation of some file system utilities.
- *
- */
-class FileUtils {
- private:
-  static inline int platform_create_dir(const std::string &path) {
-#ifdef WIN32
-    return _mkdir(path.c_str());
-#else
-    return mkdir(path.c_str(), 0700);
-#endif
-  }
+namespace FileUtils = ::org::apache::nifi::minifi::utils::file;
 
- public:
-  FileUtils() = delete;
-
-  /*
-   * Get the platform-specific path separator.
-   * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
-   * @return the path separator character
-   */
-  static char get_separator(bool force_posix = false) {
+namespace detail {
+static inline int platform_create_dir(const std::string& path) {
 #ifdef WIN32
-    if (force_posix) {
-      return '/';
-    } else {
-      return '\\';
-    }
+  return _mkdir(path.c_str());
 #else
-    return '/';
+  return mkdir(path.c_str(), 0700);
 #endif
-  }
+}
+}  // namespace detail
 
-  static std::string create_temp_directory(char* format) {
+/*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
+ */
+inline char get_separator(bool force_posix = false) {
 #ifdef WIN32
-    const std::string tempDirectory = concat_path(get_temp_directory(),
-      minifi::utils::IdGenerator::getIdGenerator()->generate().to_string());
-    create_dir(tempDirectory);
-    return tempDirectory;
-#else
-    if (mkdtemp(format) == nullptr) { return ""; }
-    return format;
-#endif
+  if (!force_posix) {
+    return '\\';
   }
-
-  static std::string get_temp_directory() {
+#endif
+  return '/';
+}
+
+inline std::string normalize_path_separators(std::string path, bool force_posix = false) {
+  const auto normalize_separators = [force_posix](const char c) {
+    if (c == '\\' || c == '/') { return get_separator(force_posix); }
+    return c;
+  };
+  std::transform(std::begin(path), std::end(path), std::begin(path), normalize_separators);
+  return path;
+}
+
+inline std::string get_temp_directory() {
 #ifdef WIN32
-    char tempBuffer[MAX_PATH];
-    const auto ret = GetTempPath(MAX_PATH, tempBuffer);
-    if (ret > MAX_PATH || ret == 0)
-      throw std::runtime_error("Couldn't locate temporary directory path");
-    return tempBuffer;
+  char tempBuffer[MAX_PATH];
+  const auto ret = GetTempPath(MAX_PATH, tempBuffer);
+  if (ret > MAX_PATH || ret == 0)
+    throw std::runtime_error("Couldn't locate temporary directory path");
+  return tempBuffer;
 #else
-    return "/tmp";
+  return "/tmp";
 #endif
-  }
+}
 
-  static int64_t delete_dir(const std::string &path, bool delete_files_recursively = true) {
+inline int64_t delete_dir(const std::string &path, bool delete_files_recursively = true) {
 #ifdef BOOST_VERSION
-    try {
-      if (boost::filesystem::exists(path)) {
-        if (delete_files_recursively) {
-          boost::filesystem::remove_all(path);
-        } else {
-          boost::filesystem::remove(path);
-        }
+  try {
+    if (boost::filesystem::exists(path)) {
+      if (delete_files_recursively) {
+        boost::filesystem::remove_all(path);
+      } else {
+        boost::filesystem::remove(path);
       }
-    } catch(boost::filesystem::filesystem_error const & e) {
-      return -1;
-      // display error message
     }
-    return 0;
+  } catch(boost::filesystem::filesystem_error const & e) {
+    return -1;
+    // display error message
+  }
+  return 0;
 #elif defined(WIN32)
-    WIN32_FIND_DATA FindFileData;
-    HANDLE hFind;
-    DWORD Attributes;
-    std::string str;
-
-
-    std::stringstream pathstr;
-    pathstr << path << "\\*";
-    str = pathstr.str();
-    // List files
-    hFind = FindFirstFile(str.c_str(), &FindFileData);
-    if (hFind != INVALID_HANDLE_VALUE) {
-      do {
-        if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
-          std::stringstream strs;
-          strs << path << "\\" << FindFileData.cFileName;
-          str = strs.str();
-
-          Attributes = GetFileAttributes(str.c_str());
-          if (Attributes & FILE_ATTRIBUTE_DIRECTORY) {
-            // is directory
-            delete_dir(str, delete_files_recursively);
-          } else {
-            remove(str.c_str());
-            // not directory
-          }
+  WIN32_FIND_DATA FindFileData;
+  HANDLE hFind;
+  DWORD Attributes;
+  std::string str;
+
+
+  std::stringstream pathstr;
+  pathstr << path << "\\*";
+  str = pathstr.str();
+  // List files
+  hFind = FindFirstFile(str.c_str(), &FindFileData);
+  if (hFind != INVALID_HANDLE_VALUE) {
+    do {
+      if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
+        std::stringstream strs;
+        strs << path << "\\" << FindFileData.cFileName;
+        str = strs.str();
+
+        Attributes = GetFileAttributes(str.c_str());
+        if (Attributes & FILE_ATTRIBUTE_DIRECTORY) {
+          // is directory
+          delete_dir(str, delete_files_recursively);
+        } else {
+          remove(str.c_str());
+          // not directory
         }
-      }while (FindNextFile(hFind, &FindFileData));
-      FindClose(hFind);
+      }
+    }while (FindNextFile(hFind, &FindFileData));
+    FindClose(hFind);
 
-      RemoveDirectory(path.c_str());
-    }
-    return 0;
+    RemoveDirectory(path.c_str());
+  }
+  return 0;
 #else
-    DIR *current_directory = opendir(path.c_str());
-    int r = -1;
-    if (current_directory) {
-      struct dirent *p;
-      r = 0;
-      while (!r && (p = readdir(current_directory))) {
-        int r2 = -1;
-        std::stringstream newpath;
-        if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) {
-          continue;
-        }
-        struct stat statbuf;
+  DIR *current_directory = opendir(path.c_str());
+  int r = -1;
+  if (current_directory) {
+    struct dirent *p;
+    r = 0;
+    while (!r && (p = readdir(current_directory))) {
+      int r2 = -1;
+      std::stringstream newpath;
+      if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) {
+        continue;
+      }
+      struct stat statbuf;
 
-        newpath << path << "/" << p->d_name;
+      newpath << path << "/" << p->d_name;
 
-        if (!stat(newpath.str().c_str(), &statbuf)) {
-          if (S_ISDIR(statbuf.st_mode)) {
-            if (delete_files_recursively) {
-              r2 = delete_dir(newpath.str(), delete_files_recursively);
-            }
-          } else {
-            r2 = unlink(newpath.str().c_str());
+      if (!stat(newpath.str().c_str(), &statbuf)) {
+        if (S_ISDIR(statbuf.st_mode)) {
+          if (delete_files_recursively) {
+            r2 = delete_dir(newpath.str(), delete_files_recursively);
           }
+        } else {
+          r2 = unlink(newpath.str().c_str());
         }
-        r = r2;
       }
-      closedir(current_directory);
+      r = r2;
     }
+    closedir(current_directory);
+  }
 
-    if (!r) {
-      return rmdir(path.c_str());
-    }
+  if (!r) {
+    return rmdir(path.c_str());
+  }
 
-    return 0;
+  return 0;
 #endif
-  }
+}
 
-  static uint64_t last_write_time(const std::string &path) {
+inline uint64_t last_write_time(const std::string &path) {
 #ifdef BOOST_VERSION
-    return boost::filesystem::last_write_time(movedFile.str());
+  return boost::filesystem::last_write_time(movedFile.str());
 #else
 #ifdef WIN32
-    struct _stat result;
-    if (_stat(path.c_str(), &result) == 0) {
-      return result.st_mtime;
-    }
+  struct _stat result;
+  if (_stat(path.c_str(), &result) == 0) {
+    return result.st_mtime;
+  }
 #else
-    struct stat result;
-    if (stat(path.c_str(), &result) == 0) {
-      return result.st_mtime;
-    }
+  struct stat result;
+  if (stat(path.c_str(), &result) == 0) {
+    return result.st_mtime;
+  }
 #endif
 #endif
-    return 0;
-  }
+  return 0;
+}
 
-  static std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> last_write_time_point(const std::string &path) {
-    return std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>{std::chrono::seconds{last_write_time(path)}};
-  }
+inline std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> last_write_time_point(const std::string &path) {
+  return std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>{std::chrono::seconds{last_write_time(path)}};
+}
 
-  static uint64_t file_size(const std::string &path) {
+inline uint64_t file_size(const std::string &path) {
 #ifdef WIN32
-    struct _stat result;
-    if (_stat(path.c_str(), &result) == 0) {
-      return result.st_size;
-    }
+  struct _stat result;
+  if (_stat(path.c_str(), &result) == 0) {
+    return result.st_size;
+  }
 #else
-    struct stat result;
-    if (stat(path.c_str(), &result) == 0) {
-      return result.st_size;
-    }
-#endif
-    return 0;
+  struct stat result;
+  if (stat(path.c_str(), &result) == 0) {
+    return result.st_size;
   }
+#endif
+  return 0;
+}
 
-  static bool set_last_write_time(const std::string &path, uint64_t write_time) {
+inline bool set_last_write_time(const std::string &path, uint64_t write_time) {
 #ifdef WIN32
-    struct __utimbuf64 utim;
-    utim.actime = write_time;
-    utim.modtime = write_time;
-    return _utime64(path.c_str(), &utim) == 0U;
+  struct __utimbuf64 utim;
+  utim.actime = write_time;
+  utim.modtime = write_time;
+  return _utime64(path.c_str(), &utim) == 0U;
 #else
-    struct utimbuf utim;
-    utim.actime = write_time;
-    utim.modtime = write_time;
-    return utime(path.c_str(), &utim) == 0U;
+  struct utimbuf utim;
+  utim.actime = write_time;
+  utim.modtime = write_time;
+  return utime(path.c_str(), &utim) == 0U;
 #endif
-  }
+}
 
 #ifndef WIN32
-  static bool get_permissions(const std::string &path, uint32_t &permissions) {
-    struct stat result;
-    if (stat(path.c_str(), &result) == 0) {
-      permissions = result.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
-      return true;
-    }
-    return false;
+inline bool get_permissions(const std::string &path, uint32_t &permissions) {
+  struct stat result;
+  if (stat(path.c_str(), &result) == 0) {
+    permissions = result.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
+    return true;
   }
+  return false;
+}
 #endif
 
 #ifndef WIN32
-  static bool get_uid_gid(const std::string &path, uint64_t &uid, uint64_t &gid) {
-    struct stat result;
-    if (stat(path.c_str(), &result) == 0) {
-      uid = result.st_uid;
-      gid = result.st_gid;
-      return true;
-    }
-    return false;
+inline bool get_uid_gid(const std::string &path, uint64_t &uid, uint64_t &gid) {
+  struct stat result;
+  if (stat(path.c_str(), &result) == 0) {
+    uid = result.st_uid;
+    gid = result.st_gid;
+    return true;
   }
+  return false;
+}
 #endif
 
-  static int is_directory(const char * path) {
-      struct stat dir_stat;
-      if (stat(path, &dir_stat) < 0) {
-          return 0;
-      }
-      return S_ISDIR(dir_stat.st_mode);
-  }
+inline int is_directory(const char * path) {
+    struct stat dir_stat;
+    if (stat(path, &dir_stat) < 0) {
+        return 0;
+    }
+    return S_ISDIR(dir_stat.st_mode);
+}
 
-  static int create_dir(const std::string& path, bool recursive = true) {
+inline int create_dir(const std::string& path, bool recursive = true) {
 #ifdef BOOST_VERSION
-    boost::filesystem::path dir(path);
-    if (boost::filesystem::create_directory(dir)) {
-      return 0;
-    } else {
-      return -1;
-    }
+  boost::filesystem::path dir(path);
+  if (boost::filesystem::create_directory(dir)) {
+    return 0;
+  } else {
+    return -1;
+  }
 #else
-    if (!recursive) {
-        if (platform_create_dir(path) != 0 && errno != EEXIST) {
-            return -1;
-        }
-        return 0;
-    }
-    if (platform_create_dir(path) == 0) {
-        return 0;
-    }
+  if (!recursive) {
+      if (detail::platform_create_dir(path) != 0 && errno != EEXIST) {
+          return -1;
+      }
+      return 0;
+  }
+  if (detail::platform_create_dir(path) == 0) {
+      return 0;
+  }
 
-    switch (errno) {
-    case ENOENT: {
-        size_t found = path.find_last_of(get_separator());
+  switch (errno) {
+  case ENOENT: {
+      size_t found = path.find_last_of(get_separator());
 
-        if (found == std::string::npos) {
-            return -1;
-        }
+      if (found == std::string::npos) {
+          return -1;
+      }
 
-        const std::string dir = path.substr(0, found);
-        int res = create_dir(dir);
-        if (res < 0) {
-            return -1;
-        }
-        return platform_create_dir(path);
-    }
-    case EEXIST: {
-        if (is_directory(path.c_str())) {
-            return 0;
-        }
-        return -1;
-    }
-    default:
-        return -1;
-    }
-    return -1;
-#endif
+      const std::string dir = path.substr(0, found);
+      int res = create_dir(dir);
+      if (res < 0) {
+          return -1;
+      }
+      return detail::platform_create_dir(path);
   }
-
-  static int copy_file(const std::string &path_from, const std::string dest_path) {
-    std::ifstream src(path_from, std::ios::binary);
-    if (!src.is_open())
+  case EEXIST: {
+      if (is_directory(path.c_str())) {
+          return 0;
+      }
       return -1;
-    std::ofstream dest(dest_path, std::ios::binary);
-    dest << src.rdbuf();
-    return 0;
   }
+  default:
+      return -1;
+  }
+  return -1;
+#endif
+}
 
-  static void addFilesMatchingExtension(const std::shared_ptr<logging::Logger> &logger, const std::string &originalPath, const std::string &extension, std::vector<std::string> &accruedFiles) {
+inline int copy_file(const std::string &path_from, const std::string dest_path) {
+  std::ifstream src(path_from, std::ios::binary);
+  if (!src.is_open())
+    return -1;
+  std::ofstream dest(dest_path, std::ios::binary);
+  dest << src.rdbuf();
+  return 0;
+}
+
+inline void addFilesMatchingExtension(const std::shared_ptr<logging::Logger> &logger, const std::string &originalPath, const std::string &extension, std::vector<std::string> &accruedFiles) {
 #ifndef WIN32
 
-    struct stat s;
-    if (stat(originalPath.c_str(), &s) == 0) {
-      if (s.st_mode & S_IFDIR) {
-        DIR *d;
-        d = opendir(originalPath.c_str());
-        if (!d) {
+  struct stat s;
+  if (stat(originalPath.c_str(), &s) == 0) {
+    if (s.st_mode & S_IFDIR) {
+      DIR *d;
+      d = opendir(originalPath.c_str());
+      if (!d) {
+        return;
+      }
+      // only perform a listing while we are not empty
+      logger->log_debug("Performing file listing on %s", originalPath);
+
+      struct dirent *entry;
+      entry = readdir(d);
+      while (entry != nullptr) {
+        std::string d_name = entry->d_name;
+        std::string path = originalPath + "/" + d_name;
+        struct stat statbuf { };
+        if (stat(path.c_str(), &statbuf) != 0) {
+          logger->log_warn("Failed to stat %s", path);
           return;
         }
-        // only perform a listing while we are not empty
-        logger->log_debug("Performing file listing on %s", originalPath);
-
-        struct dirent *entry;
-        entry = readdir(d);
-        while (entry != nullptr) {
-          std::string d_name = entry->d_name;
-          std::string path = originalPath + "/" + d_name;
-          struct stat statbuf { };
-          if (stat(path.c_str(), &statbuf) != 0) {
-            logger->log_warn("Failed to stat %s", path);
-            return;
+        if (S_ISDIR(statbuf.st_mode)) {
+          // if this is a directory
+          if (d_name != ".." && d_name != ".") {
+            addFilesMatchingExtension(logger, path, extension, accruedFiles);
           }
-          if (S_ISDIR(statbuf.st_mode)) {
-            // if this is a directory
-            if (d_name != ".." && d_name != ".") {
-              addFilesMatchingExtension(logger, path, extension, accruedFiles);
-            }
-          } else {
-            if (utils::StringUtils::endsWith(path, extension)) {
-              logger->log_info("Adding %s to paths", path);
-              accruedFiles.push_back(path);
-            }
+        } else {
+          if (utils::StringUtils::endsWith(path, extension)) {
+            logger->log_info("Adding %s to paths", path);
+            accruedFiles.push_back(path);
           }
-          entry = readdir(d);
-        }
-        closedir(d);
-      } else if (s.st_mode & S_IFREG) {
-        if (utils::StringUtils::endsWith(originalPath, extension)) {
-          logger->log_info("Adding %s to paths", originalPath);
-          accruedFiles.push_back(originalPath);
         }
-      } else {
-        logger->log_error("Could not stat", originalPath);
+        entry = readdir(d);
+      }
+      closedir(d);
+    } else if (s.st_mode & S_IFREG) {
+      if (utils::StringUtils::endsWith(originalPath, extension)) {
+        logger->log_info("Adding %s to paths", originalPath);
+        accruedFiles.push_back(originalPath);
       }
-
     } else {
-      logger->log_error("Could not access %s", originalPath);
+      logger->log_error("Could not stat", originalPath);
     }
+
+  } else {
+    logger->log_error("Could not access %s", originalPath);
+  }
 #else
-    HANDLE hFind;
-    WIN32_FIND_DATA FindFileData;
+  HANDLE hFind;
+  WIN32_FIND_DATA FindFileData;
 
-    std::string pathToSearch = originalPath + "\\*" + extension;
-    if ((hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
-      do {
-        struct _stat statbuf {};
+  std::string pathToSearch = originalPath + "\\*" + extension;
+  if ((hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
+    do {
+      struct _stat statbuf {};
 
-        std::string path = originalPath + "\\" + FindFileData.cFileName;
-        logger->log_info("Adding %s to paths", path);
-        if (_stat(path.c_str(), &statbuf) != 0) {
-          logger->log_warn("Failed to stat %s", path);
-          break;
-        }
-        logger->log_info("Adding %s to paths", path);
-        if (S_ISDIR(statbuf.st_mode)) {
-          addFilesMatchingExtension(logger, path, extension, accruedFiles);
-        } else {
-          if (utils::StringUtils::endsWith(path, extension)) {
-            logger->log_info("Adding %s to paths", path);
-            accruedFiles.push_back(path);
-          }
+      std::string path = originalPath + "\\" + FindFileData.cFileName;
+      logger->log_info("Adding %s to paths", path);
+      if (_stat(path.c_str(), &statbuf) != 0) {
+        logger->log_warn("Failed to stat %s", path);
+        break;
+      }
+      logger->log_info("Adding %s to paths", path);
+      if (S_ISDIR(statbuf.st_mode)) {
+        addFilesMatchingExtension(logger, path, extension, accruedFiles);
+      } else {
+        if (utils::StringUtils::endsWith(path, extension)) {
+          logger->log_info("Adding %s to paths", path);
+          accruedFiles.push_back(path);
         }
-      }while (FindNextFileA(hFind, &FindFileData));
-      FindClose(hFind);
-    }
+      }
+    }while (FindNextFileA(hFind, &FindFileData));
+    FindClose(hFind);
+  }
 #endif
+}
+
+/*
+ * Provides a platform-independent function to list a directory
+ * Callback is called for every file found: first argument is the path of the directory, second is the filename
+ * Return value of the callback is used to continue (true) or stop (false) listing
+ */
+inline void list_dir(const std::string& dir, std::function<bool(const std::string&, const std::string&)> callback,
+                     const std::shared_ptr<logging::Logger> &logger, bool recursive = true) {
+  logger->log_debug("Performing file listing against %s", dir);
+#ifndef WIN32
+  DIR *d = opendir(dir.c_str());
+  if (!d) {
+    logger->log_warn("Failed to open directory: %s", dir.c_str());
+    return;
   }
 
-  static std::vector<std::pair<std::string, std::string>> list_dir_all(const std::string& dir, const std::shared_ptr<logging::Logger> &logger,
-                                                                       bool recursive = true)  {
-    std::vector<std::pair<std::string, std::string>> fileList;
-    auto lambda = [&fileList] (const std::string &path, const std::string &filename) {
-      fileList.push_back(make_pair(path, filename));
-      return true;
-    };
+  struct dirent *entry;
+  while ((entry = readdir(d)) != NULL) {
+    std::string d_name = entry->d_name;
+    std::string path = dir + get_separator() + d_name;
 
-    list_dir(dir, lambda, logger, recursive);
+    struct stat statbuf;
+    if (stat(path.c_str(), &statbuf) != 0) {
+      logger->log_warn("Failed to stat %s", path);
+      continue;
+    }
 
-    return fileList;
+    if (S_ISDIR(statbuf.st_mode)) {
+      // if this is a directory
+      if (recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
+        list_dir(path, callback, logger, recursive);
+      }
+    } else {
+      if (!callback(dir, d_name)) {
+        break;
+      }
+    }
   }
+  closedir(d);
+#else
+  HANDLE hFind;
+  WIN32_FIND_DATA FindFileData;
 
-  /*
-   * Provides a platform-independent function to list a directory
-   * Callback is called for every file found: first argument is the path of the directory, second is the filename
-   * Return value of the callback is used to continue (true) or stop (false) listing
-   */
-  static void list_dir(const std::string& dir, std::function<bool(const std::string&, const std::string&)> callback,
-                       const std::shared_ptr<logging::Logger> &logger, bool recursive = true) {
-    logger->log_debug("Performing file listing against %s", dir);
-#ifndef WIN32
-    DIR *d = opendir(dir.c_str());
-    if (!d) {
-      logger->log_warn("Failed to open directory: %s", dir.c_str());
-      return;
-    }
+  std::string pathToSearch = dir + "\\*.*";
+  hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData);
 
-    struct dirent *entry;
-    while ((entry = readdir(d)) != NULL) {
-      std::string d_name = entry->d_name;
-      std::string path = dir + get_separator() + d_name;
+  if (hFind == INVALID_HANDLE_VALUE) {
+    logger->log_warn("Failed to open directory: %s", dir.c_str());
+    return;
+  }
 
-      struct stat statbuf;
-      if (stat(path.c_str(), &statbuf) != 0) {
+  do {
+    struct _stat statbuf {};
+    if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
+      std::string path = dir + get_separator() + FindFileData.cFileName;
+      if (_stat(path.c_str(), &statbuf) != 0) {
         logger->log_warn("Failed to stat %s", path);
         continue;
       }
-
       if (S_ISDIR(statbuf.st_mode)) {
-        // if this is a directory
-        if (recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
+        if (recursive) {
           list_dir(path, callback, logger, recursive);
         }
       } else {
-        if (!callback(dir, d_name)) {
+        if (!callback(dir, FindFileData.cFileName)) {
           break;
         }
       }
     }
-    closedir(d);
-#else
-    HANDLE hFind;
-    WIN32_FIND_DATA FindFileData;
+  } while (FindNextFileA(hFind, &FindFileData));
+  FindClose(hFind);
+#endif
+}
 
-    std::string pathToSearch = dir + "\\*.*";
-    hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData);
+inline std::vector<std::pair<std::string, std::string>> list_dir_all(const std::string& dir, const std::shared_ptr<logging::Logger> &logger,
+    bool recursive = true)  {
+  std::vector<std::pair<std::string, std::string>> fileList;
+  auto lambda = [&fileList] (const std::string &path, const std::string &filename) {
+    fileList.push_back(make_pair(path, filename));
+    return true;
+  };
 
-    if (hFind == INVALID_HANDLE_VALUE) {
-      logger->log_warn("Failed to open directory: %s", dir.c_str());
-      return;
-    }
+  list_dir(dir, lambda, logger, recursive);
 
-    do {
-      struct _stat statbuf {};
-      if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) {
-        std::string path = dir + get_separator() + FindFileData.cFileName;
-        if (_stat(path.c_str(), &statbuf) != 0) {
-          logger->log_warn("Failed to stat %s", path);
-          continue;
-        }
-        if (S_ISDIR(statbuf.st_mode)) {
-          if (recursive) {
-            list_dir(path, callback, logger, recursive);
-          }
-        } else {
-          if (!callback(dir, FindFileData.cFileName)) {
-            break;
-          }
-        }
-      }
-    } while (FindNextFileA(hFind, &FindFileData));
-    FindClose(hFind);
-#endif
-  }
+  return fileList;
+}
 
-  static std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
-    if (root.empty()) {
-      return child;
-    }
-    std::stringstream new_path;
-    if (root.back() == get_separator(force_posix)) {
-      new_path << root << child;
-    } else {
-      new_path << root << get_separator(force_posix) << child;
-    }
-    return new_path.str();
+inline std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
+  if (root.empty()) {
+    return child;
+  }
+  std::stringstream new_path;
+  if (root.back() == get_separator(force_posix)) {
+    new_path << root << child;
+  } else {
+    new_path << root << get_separator(force_posix) << child;
   }
+  return new_path.str();
+}
 
-  static std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
-    if (path.empty()) {
-      /* Empty path has no parent and no child*/
-      return std::make_tuple("", "");
-    }
-    bool absolute = false;
-    size_t root_pos = 0U;
+inline std::string create_temp_directory(char* format) {
 #ifdef WIN32
-    if (!force_posix) {
+  const std::string tempDirectory = concat_path(get_temp_directory(),
+      minifi::utils::IdGenerator::getIdGenerator()->generate().to_string());
+  create_dir(tempDirectory);
+  return tempDirectory;
+#else
+  if (mkdtemp(format) == nullptr) { return ""; }
+  return format;
+#endif
+}
+
+inline std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
+  if (path.empty()) {
+    /* Empty path has no parent and no child*/
+    return std::make_tuple("", "");
+  }
+  bool absolute = false;
+  size_t root_pos = 0U;
+#ifdef WIN32
+  if (!force_posix) {
       if (path[0] == '\\') {
         absolute = true;
         if (path.size() < 2U) {
@@ -595,78 +596,78 @@ class FileUtils {
       }
     } else {
 #else
-    if (true) {
+  if (true) {
 #endif
-      if (path[0] == '/') {
-        absolute = true;
-        root_pos = 0U;
-      }
-    }
-    /* Maybe we are just a single relative child */
-    if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
-      return std::make_tuple("", path);
-    }
-    /* Ignore trailing separators */
-    size_t last_pos = path.size() - 1;
-    while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
-      last_pos--;
+    if (path[0] == '/') {
+      absolute = true;
+      root_pos = 0U;
     }
-    if (absolute && last_pos == root_pos) {
-      /* This means we are only a root */
-      return std::make_tuple("", "");
-    }
-    /* Find parent-child separator */
-    size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
-    if (last_separator == std::string::npos || last_separator < root_pos) {
-      return std::make_tuple("", "");
-    }
-    std::string parent = path.substr(0, last_separator + 1);
-    std::string child = path.substr(last_separator + 1);
-
-    return std::make_tuple(std::move(parent), std::move(child));
   }
-
-  static std::string get_parent_path(const std::string& path, bool force_posix = false) {
-    std::string parent_path;
-    std::tie(parent_path, std::ignore) = split_path(path, force_posix);
-    return parent_path;
+  /* Maybe we are just a single relative child */
+  if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
+    return std::make_tuple("", path);
   }
-
-  static std::string get_child_path(const std::string& path, bool force_posix = false) {
-    std::string child_path;
-    std::tie(std::ignore, child_path) = split_path(path, force_posix);
-    return child_path;
+  /* Ignore trailing separators */
+  size_t last_pos = path.size() - 1;
+  while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
+    last_pos--;
+  }
+  if (absolute && last_pos == root_pos) {
+    /* This means we are only a root */
+    return std::make_tuple("", "");
+  }
+  /* Find parent-child separator */
+  size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
+  if (last_separator == std::string::npos || last_separator < root_pos) {
+    return std::make_tuple("", "");
   }
+  std::string parent = path.substr(0, last_separator + 1);
+  std::string child = path.substr(last_separator + 1);
+
+  return std::make_tuple(std::move(parent), std::move(child));
+}
+
+inline std::string get_parent_path(const std::string& path, bool force_posix = false) {
+  std::string parent_path;
+  std::tie(parent_path, std::ignore) = split_path(path, force_posix);
+  return parent_path;
+}
+
+inline std::string get_child_path(const std::string& path, bool force_posix = false) {
+  std::string child_path;
+  std::tie(std::ignore, child_path) = split_path(path, force_posix);
+  return child_path;
+}
 
-  static bool is_hidden(const std::string& path) {
+inline bool is_hidden(const std::string& path) {
 #ifdef WIN32
-    DWORD attributes = GetFileAttributesA(path.c_str());
+  DWORD attributes = GetFileAttributesA(path.c_str());
     return ((attributes != INVALID_FILE_ATTRIBUTES)  && ((attributes & FILE_ATTRIBUTE_HIDDEN) != 0));
 #else
-    return std::get<1>(split_path(path)).rfind(".", 0) == 0;
+  return std::get<1>(split_path(path)).rfind(".", 0) == 0;
 #endif
-  }
+}
 
-  /*
-   * Returns the absolute path of the current executable
-   */
-  static std::string get_executable_path() {
+/*
+ * Returns the absolute path of the current executable
+ */
+inline std::string get_executable_path() {
 #if defined(__linux__)
-    std::vector<char> buf(1024U);
-    while (true) {
-      ssize_t ret = readlink("/proc/self/exe", buf.data(), buf.size());
-      if (ret < 0) {
-        return "";
-      }
-      if (static_cast<size_t>(ret) == buf.size()) {
-        /* It may have been truncated */
-        buf.resize(buf.size() * 2);
-        continue;
-      }
-      return std::string(buf.data(), ret);
+  std::vector<char> buf(1024U);
+  while (true) {
+    ssize_t ret = readlink("/proc/self/exe", buf.data(), buf.size());
+    if (ret < 0) {
+      return "";
     }
+    if (static_cast<size_t>(ret) == buf.size()) {
+      /* It may have been truncated */
+      buf.resize(buf.size() * 2);
+      continue;
+    }
+    return std::string(buf.data(), ret);
+  }
 #elif defined(__APPLE__)
-    std::vector<char> buf(PATH_MAX);
+  std::vector<char> buf(PATH_MAX);
     uint32_t buf_size = buf.size();
     while (_NSGetExecutablePath(buf.data(), &buf_size) != 0) {
       buf.resize(buf_size);
@@ -697,37 +698,37 @@ class FileUtils {
 #else
     return "";
 #endif
-  }
+}
 
-  /*
-   * Returns the absolute path to the directory containing the current executable
-   */
-  static std::string get_executable_dir() {
-    auto executable_path = get_executable_path();
-    if (executable_path.empty()) {
-      return "";
-    }
-    return get_parent_path(executable_path);
+/*
+ * Returns the absolute path to the directory containing the current executable
+ */
+inline std::string get_executable_dir() {
+  auto executable_path = get_executable_path();
+  if (executable_path.empty()) {
+    return "";
   }
+  return get_parent_path(executable_path);
+}
 
-  static int close(int file_descriptor) {
+inline int close(int file_descriptor) {
 #ifdef WIN32
-    return _close(file_descriptor);
+  return _close(file_descriptor);
 #else
-    return ::close(file_descriptor);
+  return ::close(file_descriptor);
 #endif
-  }
+}
 
-  static int access(const char *path_name, int mode) {
+inline int access(const char *path_name, int mode) {
 #ifdef WIN32
-    return _access(path_name, mode);
+  return _access(path_name, mode);
 #else
-    return ::access(path_name, mode);
+  return ::access(path_name, mode);
 #endif
-  }
+}
 
 #ifdef WIN32
-  static std::error_code hide_file(const char* const file_name) {
+inline std::error_code hide_file(const char* const file_name) {
     const bool success = SetFileAttributesA(file_name, FILE_ATTRIBUTE_HIDDEN);
     if (!success) {
       // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing.
@@ -738,8 +739,7 @@ class FileUtils {
   }
 #endif /* WIN32 */
 
-  static uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
-}; // NOLINT
+uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
 
 }  // namespace file
 }  // namespace utils
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h
index 22fafd6..00db039 100644
--- a/libminifi/include/utils/file/PathUtils.h
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -17,7 +17,12 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
 #define LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
 
+#include <cctype>
+#include <cinttypes>
+#include <memory>
 #include <string>
+#include <system_error>
+#include <utility>
 
 namespace org {
 namespace apache {
@@ -25,7 +30,9 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 namespace file {
-namespace PathUtils {
+
+namespace PathUtils = ::org::apache::nifi::minifi::utils::file;
+using path = const char*;
 
 /**
  * Extracts the filename and path performing some validation of the path and output to ensure
@@ -47,7 +54,54 @@ std::string getFullPath(const std::string& path);
 
 std::string globToRegex(std::string glob);
 
-}  // namespace PathUtils
+inline bool isAbsolutePath(const char* const path) noexcept {
+#ifdef _WIN32
+  return path && std::isalpha(path[0]) && path[1] == ':' && (path[2] == '\\' || path[2] == '/');
+#else
+  return path && path[0] == '/';
+#endif
+}
+
+
+/**
+ * Represents filesystem space information in bytes
+ */
+struct space_info {
+  std::uintmax_t capacity;
+  std::uintmax_t free;
+  std::uintmax_t available;
+
+  friend bool operator==(const space_info& a, const space_info& b) noexcept {
+    return a.capacity == b.capacity && a.free == b.free && a.available == b.available;
+  }
+};
+
+class filesystem_error : public std::system_error {
+ public:
+  filesystem_error(const std::string& what_arg, const std::error_code ec)
+      :std::system_error{ec, what_arg}
+  {}
+  filesystem_error(const std::string& what_arg, const path path1, const path path2, const std::error_code ec)
+      :std::system_error{ec, what_arg}, paths_involved_{std::make_shared<const std::pair<std::string, std::string>>(path1, path2)}
+  {}
+
+  // copy should be noexcept as soon as libstdc++ fixes std::system_error copy
+  filesystem_error(const filesystem_error& o) = default;
+  filesystem_error& operator=(const filesystem_error&) = default;
+
+  path path1() const noexcept { return paths_involved_->first.c_str(); }
+  path path2() const noexcept { return paths_involved_->second.c_str(); }
+
+ private:
+  std::shared_ptr<const std::pair<std::string, std::string>> paths_involved_;
+};
+
+/**
+ * Provides filesystem space information for the specified directory
+ */
+space_info space(path);
+space_info space(path, std::error_code&) noexcept;
+
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 467524b..956ca4a 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -22,51 +22,55 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-const char *Configure::nifi_default_directory = "nifi.default.directory";
-const char *Configure::nifi_c2_enable = "nifi.c2.enable";
-const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file";
-const char *Configure::nifi_flow_configuration_file_exit_failure = "nifi.flow.configuration.file.exit.onfailure";
-const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
-const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
-const char *Configure::nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
-const char *Configure::nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
-const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
-const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
-const char *Configure::nifi_flowcontroller_drain_timeout = "nifi.flowcontroller.drain.timeout";
-const char *Configure::nifi_log_level = "nifi.log.level";
-const char *Configure::nifi_server_name = "nifi.server.name";
-const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name";
-const char *Configure::nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name";
-const char *Configure::nifi_content_repository_class_name = "nifi.content.repository.class.name";
-const char *Configure::nifi_volatile_repository_options = "nifi.volatile.repository.options.";
-const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
-const char *Configure::nifi_server_port = "nifi.server.port";
-const char *Configure::nifi_server_report_interval = "nifi.server.report.interval";
-const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size";
-const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
-const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
-const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size";
-const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time";
-const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
-const char *Configure::nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
-const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_remote_input_http = "nifi.remote.input.http.enabled";
-const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
-const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
-const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
-const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";
-const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
-const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
-const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
-const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
-const char *Configure::nifi_state_management_provider_local = "nifi.state.management.provider.local";
-const char *Configure::nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
-const char *Configure::nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
+constexpr const char *Configure::nifi_default_directory;
+constexpr const char *Configure::nifi_c2_enable;
+constexpr const char *Configure::nifi_flow_configuration_file;
+constexpr const char *Configure::nifi_flow_configuration_file_exit_failure;
+constexpr const char *Configure::nifi_flow_configuration_file_backup_update;
+constexpr const char *Configure::nifi_flow_engine_threads;
+constexpr const char *Configure::nifi_flow_engine_alert_period;
+constexpr const char *Configure::nifi_flow_engine_event_driven_time_slice;
+constexpr const char *Configure::nifi_administrative_yield_duration;
+constexpr const char *Configure::nifi_bored_yield_duration;
+constexpr const char *Configure::nifi_graceful_shutdown_seconds;
+constexpr const char *Configure::nifi_flowcontroller_drain_timeout;
+constexpr const char *Configure::nifi_log_level;
+constexpr const char *Configure::nifi_server_name;
+constexpr const char *Configure::nifi_configuration_class_name;
+constexpr const char *Configure::nifi_flow_repository_class_name;
+constexpr const char *Configure::nifi_content_repository_class_name;
+constexpr const char *Configure::nifi_volatile_repository_options;
+constexpr const char *Configure::nifi_provenance_repository_class_name;
+constexpr const char *Configure::nifi_server_port;
+constexpr const char *Configure::nifi_server_report_interval;
+constexpr const char *Configure::nifi_provenance_repository_max_storage_size;
+constexpr const char *Configure::nifi_provenance_repository_max_storage_time;
+constexpr const char *Configure::nifi_provenance_repository_directory_default;
+constexpr const char *Configure::nifi_flowfile_repository_max_storage_size;
+constexpr const char *Configure::nifi_flowfile_repository_max_storage_time;
+constexpr const char *Configure::nifi_flowfile_repository_directory_default;
+constexpr const char *Configure::nifi_dbcontent_repository_directory_default;
+constexpr const char *Configure::nifi_remote_input_secure;
+constexpr const char *Configure::nifi_remote_input_http;
+constexpr const char *Configure::nifi_security_need_ClientAuth;
+constexpr const char *Configure::nifi_security_client_certificate;
+constexpr const char *Configure::nifi_security_client_private_key;
+constexpr const char *Configure::nifi_security_client_pass_phrase;
+constexpr const char *Configure::nifi_security_client_ca_certificate;
+constexpr const char *Configure::nifi_rest_api_user_name;
+constexpr const char *Configure::nifi_rest_api_password;
+constexpr const char *Configure::nifi_c2_file_watch;
+constexpr const char *Configure::nifi_c2_flow_id;
+constexpr const char *Configure::nifi_c2_flow_url;
+constexpr const char *Configure::nifi_c2_flow_base_url;
+constexpr const char *Configure::nifi_c2_full_heartbeat;
+constexpr const char *Configure::nifi_state_management_provider_local;
+constexpr const char *Configure::nifi_state_management_provider_local_always_persist;
+constexpr const char *Configure::nifi_state_management_provider_local_auto_persistence_interval;
+constexpr const char *Configure::minifi_disk_space_watchdog_enable;
+constexpr const char *Configure::minifi_disk_space_watchdog_interval;
+constexpr const char *Configure::minifi_disk_space_watchdog_stop_threshold;
+constexpr const char *Configure::minifi_disk_space_watchdog_restart_threshold;
 
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/DiskSpaceWatchdog.cpp b/libminifi/src/DiskSpaceWatchdog.cpp
new file mode 100644
index 0000000..9a7f26c
--- /dev/null
+++ b/libminifi/src/DiskSpaceWatchdog.cpp
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DiskSpaceWatchdog.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "properties/Configure.h"
+#include "utils/file/PathUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+namespace {
+namespace chr = std::chrono;
+
+utils::optional<chr::milliseconds> time_string_to_milliseconds(const std::string& str) {
+  uint64_t millisec_value{};
+  const bool success = core::Property::getTimeMSFromString(str, millisec_value);
+  if (!success) return utils::nullopt;
+  return utils::make_optional(chr::milliseconds{millisec_value});
+}
+
+template<typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
+utils::optional<T> data_size_string_to_int(const std::string& str) {
+  T result{};
+  // actually aware of data units like B, kB, MB, etc.
+  const bool success = core::Property::StringToInt(str, result);
+  if (!success) return utils::nullopt;
+  return utils::make_optional(result);
+}
+
+
+}  // namespace
+
+namespace disk_space_watchdog {
+Config read_config(const Configure& conf) {
+  const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::flatMap(time_string_to_milliseconds);
+  const auto stop_bytes = conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
+  const auto restart_bytes = conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
+  if (restart_bytes < stop_bytes) { throw std::runtime_error{"disk space watchdog stop threshold must be <= restart threshold"}; }
+  constexpr auto mebibytes = 1024 * 1024;
+  return {
+      interval_ms.value_or(chr::seconds{15}),
+      stop_bytes.value_or(100 * mebibytes),
+      restart_bytes.value_or(150 * mebibytes)
+  };
+}
+
+
+std::vector<std::uintmax_t> check_available_space(const std::vector<std::string>& paths, core::logging::Logger* logger) {
+  std::vector<std::uintmax_t> result;
+  result.reserve(paths.size());
+  std::transform(std::begin(paths), std::end(paths), std::back_inserter(result), [logger](const std::string& path) {
+    std::error_code ec;
+    const auto result = utils::file::space(path.c_str(), ec);
+    if (ec && logger) {
+      logger->log_info("Couldn't check disk space at %s: %s (ignoring)", path, ec.message());
+    } else if (logger) {
+      logger->log_trace("%s available space: %zu bytes", path, gsl::narrow_cast<size_t>(result.available));
+    }
+    return result.available;
+  });
+  return result;
+}
+
+}  // namespace disk_space_watchdog
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 962b397..b3d8dc2 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -52,6 +52,8 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Connectable.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
 #include "utils/HTTPClient.h"
 #include "utils/GeneralUtils.h"
 #include "io/NetworkPrioritizer.h"
@@ -83,7 +85,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
       thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
       flow_configuration_(std::move(flow_configuration)),
-      configuration_(configure),
+      configuration_(std::move(configure)),
       content_repo_(content_repo),
       logger_(logging::LoggerFactory<FlowController>::getLogger()) {
   if (provenance_repo == nullptr)
@@ -103,25 +105,23 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
   c2_initialized_ = false;
   root_ = nullptr;
 
-  protocol_ = utils::make_unique<FlowControlProtocol>(this, configure);
+  protocol_ = utils::make_unique<FlowControlProtocol>(this, configuration_);
 
   if (!headless_mode) {
     std::string rawConfigFileString;
-    configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
+    configuration_->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
 
     if (!rawConfigFileString.empty()) {
       configuration_filename_ = rawConfigFileString;
     }
 
-    std::string adjustedFilename;
-    if (!configuration_filename_.empty()) {
-      // perform a naive determination if this is a relative path
-      if (configuration_filename_.c_str()[0] != '/') {
-        adjustedFilename = adjustedFilename + configure->getHome() + "/" + configuration_filename_;
-      } else {
-        adjustedFilename = configuration_filename_;
+    const auto adjustedFilename = [&]() -> std::string {
+      if (configuration_filename_.empty()) { return {}; }
+      if (utils::file::isAbsolutePath(configuration_filename_.c_str())) {
+        return configuration_filename_;
       }
-    }
+      return utils::file::FileUtils::concat_path(configuration_->getHome(), configuration_filename_);
+    }();
     initializeExternalComponents();
     initializePaths(adjustedFilename);
   }
@@ -142,25 +142,20 @@ void FlowController::initializeExternalComponents() {
 }
 
 void FlowController::initializePaths(const std::string &adjustedFilename) {
-  char *path = NULL;
+  const char *path = nullptr;
 #ifndef WIN32
   char full_path[PATH_MAX];
   path = realpath(adjustedFilename.c_str(), full_path);
 #else
-  path = const_cast<char*>(adjustedFilename.c_str());
+  path = adjustedFilename.c_str();
 #endif
 
-  if (path == NULL) {
+  if (path == nullptr) {
     throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
   }
   std::string pathString(path);
   configuration_filename_ = pathString;
   logger_->log_info("FlowController NiFi Configuration file %s", pathString);
-
-  if (!path) {
-    logger_->log_error("Could not locate path from provided configuration file name (%s).  Exiting.", path);
-    exit(1);
-  }
 }
 
 utils::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() {
@@ -200,6 +195,9 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
   if (newRoot == nullptr)
     return false;
 
+  if (!isRunning())
+    return false;
+
   logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion());
 
   updating_ = true;
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index a0f3893..88f19d5 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -117,7 +117,7 @@ void Properties::loadConfigureFile(const char *fileName) {
     return;
   }
 
-  properties_file_ = utils::file::PathUtils::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName));
+  properties_file_ = utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName));
 
   logger_->log_info("Using configuration file to load configuration for %s from %s (located at %s)", getName().c_str(), fileName, properties_file_);
 
diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp
index b02c8e3..792cc79 100644
--- a/libminifi/src/utils/file/FileUtils.cpp
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -29,7 +29,7 @@ namespace minifi {
 namespace utils {
 namespace file {
 
-uint64_t FileUtils::computeChecksum(const std::string &file_name, uint64_t up_to_position) {
+uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position) {
   constexpr uint64_t BUFFER_SIZE = 4096u;
   std::array<char, std::size_t{BUFFER_SIZE}> buffer;
 
diff --git a/libminifi/src/utils/file/PathUtils.cpp b/libminifi/src/utils/file/PathUtils.cpp
index fd1df13..394c2e2 100644
--- a/libminifi/src/utils/file/PathUtils.cpp
+++ b/libminifi/src/utils/file/PathUtils.cpp
@@ -23,10 +23,12 @@
 #else
 #include <limits.h>
 #include <stdlib.h>
+#include <sys/statvfs.h>
 #endif
 
 #include <iostream>
 #include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -35,7 +37,7 @@ namespace minifi {
 namespace utils {
 namespace file {
 
-bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
+bool getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
   const std::size_t found = path.find_last_of(FileUtils::get_separator());
   /**
    * Don't make an assumption about about the path, return false for this case.
@@ -57,7 +59,7 @@ bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePat
   return true;
 }
 
-std::string PathUtils::getFullPath(const std::string& path) {
+std::string getFullPath(const std::string& path) {
 #ifdef WIN32
   std::vector<char> buffer(MAX_PATH);
   uint32_t len = 0U;
@@ -84,13 +86,58 @@ std::string PathUtils::getFullPath(const std::string& path) {
 #endif
 }
 
-std::string PathUtils::globToRegex(std::string glob) {
+std::string globToRegex(std::string glob) {
   utils::StringUtils::replaceAll(glob, ".", "\\.");
   utils::StringUtils::replaceAll(glob, "*", ".*");
   utils::StringUtils::replaceAll(glob, "?", ".");
   return glob;
 }
 
+space_info space(const path path, std::error_code& ec) noexcept {
+  constexpr auto kErrVal = gsl::narrow_cast<std::uintmax_t>(-1);
+#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__))
+  struct statvfs svfs{};
+  const int statvfs_retval = statvfs(path, &svfs);
+  if (statvfs_retval == -1) {
+    const std::error_code err_code{errno, std::generic_category()};
+    ec = err_code;
+    return space_info{kErrVal, kErrVal, kErrVal};
+  }
+  const auto capacity = std::uintmax_t{svfs.f_blocks} * svfs.f_frsize;
+  const auto free = std::uintmax_t{svfs.f_bfree} * svfs.f_frsize;
+  const auto available = std::uintmax_t{svfs.f_bavail} * svfs.f_frsize;
+#elif defined(_WIN32)
+  ULARGE_INTEGER free_bytes_available_to_caller;
+  ULARGE_INTEGER total_number_of_bytes;
+  ULARGE_INTEGER total_number_of_free_bytes;
+  const bool get_disk_free_space_ex_success = GetDiskFreeSpaceEx(path, &free_bytes_available_to_caller, &total_number_of_bytes,
+      &total_number_of_free_bytes);
+  if (!get_disk_free_space_ex_success) {
+    const std::error_code err_code{gsl::narrow<int>(GetLastError()), std::system_category()};
+    ec = err_code;
+    return space_info{kErrVal, kErrVal, kErrVal};
+  }
+  const auto capacity = total_number_of_bytes.QuadPart;
+  const auto free = total_number_of_free_bytes.QuadPart;
+  const auto available = free_bytes_available_to_caller.QuadPart;
+#else
+  const auto capacity = kErrVal;
+  const auto free = kErrVal;
+  const auto available = kErrVal;
+#endif /* unix || apple */
+
+  return space_info{capacity, free, available};
+}
+
+space_info space(const path path) {
+  std::error_code ec;
+  const auto result = space(path, ec);  // const here doesn't break NRVO
+  if (ec) {
+    throw filesystem_error{ec.message(), path, "", ec};
+  }
+  return result;
+}
+
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
diff --git a/libminifi/test/unit/DiskSpaceWatchdogTests.cpp b/libminifi/test/unit/DiskSpaceWatchdogTests.cpp
new file mode 100644
index 0000000..604b3d6
--- /dev/null
+++ b/libminifi/test/unit/DiskSpaceWatchdogTests.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+
+#include "../TestBase.h"
+
+#include "properties/Configure.h"
+#include "DiskSpaceWatchdog.h"
+
+namespace minifi = org::apache::nifi::minifi;
+namespace dsg = minifi::disk_space_watchdog;
+namespace chr = std::chrono;
+
+TEST_CASE("disk_space_watchdog::read_config", "[dsg::read_config]") {
+  const auto mebibytes = 1024 * 1024;
+
+  SECTION("defaults are present") {
+    const minifi::Configure configure;
+    const auto conf = dsg::read_config(configure);
+    REQUIRE(conf.interval >= chr::nanoseconds{0});
+    REQUIRE(conf.restart_threshold_bytes > conf.stop_threshold_bytes);
+  }
+
+  SECTION("basic") {
+    minifi::Configure configure;
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_stop_threshold, std::to_string(10 * mebibytes));
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_restart_threshold, std::to_string(25 * mebibytes));
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_interval, "2000 millis");
+    const auto conf = dsg::read_config(configure);
+    REQUIRE(conf.stop_threshold_bytes == 10 * mebibytes);
+    REQUIRE(conf.restart_threshold_bytes == 25 * mebibytes);
+    REQUIRE(conf.interval == chr::seconds{2});
+  }
+
+  SECTION("units") {
+    minifi::Configure configure;
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_stop_threshold, "100 MB");
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_restart_threshold, "250 MB");
+    configure.set(minifi::Configure::minifi_disk_space_watchdog_interval, "7 sec");
+    const auto conf = dsg::read_config(configure);
+    REQUIRE(conf.stop_threshold_bytes == 100 * mebibytes);
+    REQUIRE(conf.restart_threshold_bytes == 250 * mebibytes);
+    REQUIRE(conf.interval == chr::seconds{7});
+  }
+}
diff --git a/libminifi/test/unit/EnvironmentUtilsTests.cpp b/libminifi/test/unit/EnvironmentUtilsTests.cpp
index 5700a8e..78606d1 100644
--- a/libminifi/test/unit/EnvironmentUtilsTests.cpp
+++ b/libminifi/test/unit/EnvironmentUtilsTests.cpp
@@ -146,7 +146,7 @@ TEST_CASE("setcwd", "[setcwd]") {
   TestController testController;
   const std::string cwd = utils::Environment::getCurrentWorkingDirectory();
   char format[] = "/tmp/envtest.XXXXXX";
-  const std::string tempDir = utils::file::PathUtils::getFullPath(testController.createTempDirectory(format));
+  const std::string tempDir = utils::file::getFullPath(testController.createTempDirectory(format));
   REQUIRE(true == utils::Environment::setCurrentWorkingDirectory(tempDir.c_str()));
   REQUIRE(tempDir == utils::Environment::getCurrentWorkingDirectory());
   REQUIRE(true == utils::Environment::setCurrentWorkingDirectory(cwd.c_str()));
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 6ead1a8..3060299 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -28,7 +28,7 @@
 #include "utils/Environment.h"
 #include "utils/TimeUtil.h"
 
-using org::apache::nifi::minifi::utils::file::FileUtils;
+namespace FileUtils = org::apache::nifi::minifi::utils::file;
 
 TEST_CASE("TestFileUtils::concat_path", "[TestConcatPath]") {
   std::string child = "baz";
@@ -92,7 +92,7 @@ TEST_CASE("TestFilePath", "[TestGetFileNameAndPath]") {
   std::stringstream file;
   file << path.str() << FileUtils::get_separator() << "file";
   std::string filename, filepath;
-  REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(file.str(), filepath, filename) );
+  REQUIRE(true == utils::file::getFileNameAndPath(file.str(), filepath, filename) );
   REQUIRE(path.str() == filepath);
   REQUIRE("file" == filename);
 }
@@ -100,7 +100,7 @@ SECTION("NO FILE VALID PATH") {
   std::stringstream path;
   path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c" << FileUtils::get_separator();
   std::string filename, filepath;
-  REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+  REQUIRE(false == utils::file::getFileNameAndPath(path.str(), filepath, filename) );
   REQUIRE(filepath.empty());
   REQUIRE(filename.empty());
 }
@@ -110,14 +110,14 @@ SECTION("FILE NO PATH") {
   std::string filename, filepath;
   std::string expectedPath;
   expectedPath += FileUtils::get_separator();
-  REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+  REQUIRE(true == utils::file::getFileNameAndPath(path.str(), filepath, filename) );
   REQUIRE(expectedPath == filepath);
   REQUIRE("file" == filename);
 }
 SECTION("NO FILE NO PATH") {
   std::string path = "file";
   std::string filename, filepath;
-  REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+  REQUIRE(false == utils::file::getFileNameAndPath(path, filepath, filename) );
   REQUIRE(filepath.empty());
   REQUIRE(filename.empty());
 }
@@ -154,7 +154,7 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
   TestController testController;
 
   char format[] = "/tmp/gt.XXXXXX";
-  const std::string tempDir = utils::file::PathUtils::getFullPath(testController.createTempDirectory(format));
+  const std::string tempDir = utils::file::getFullPath(testController.createTempDirectory(format));
 
   const std::string cwd = utils::Environment::getCurrentWorkingDirectory();
 
@@ -168,15 +168,15 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
   REQUIRE(0 == utils::file::FileUtils::create_dir(tempDir1));
   REQUIRE(0 == utils::file::FileUtils::create_dir(tempDir2));
 
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(tempDir1));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("test1"));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("./test1"));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("././test1"));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath("./test2/../test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath(tempDir1));
+  REQUIRE(tempDir1 == utils::file::getFullPath("test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath("./test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath("././test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath("./test2/../test1"));
 #ifdef WIN32
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\test1"));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\.\\test1"));
-  REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\test2\\..\\test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath(".\\test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath(".\\.\\test1"));
+  REQUIRE(tempDir1 == utils::file::getFullPath(".\\test2\\..\\test1"));
 #endif
 }
 
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp b/libminifi/test/unit/GeneralUtilsTest.cpp
new file mode 100644
index 0000000..f7d12b7
--- /dev/null
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <functional>
+#include <string>
+#include <type_traits>
+
+#include "../TestBase.h"
+#include "utils/GeneralUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+static_assert(std::is_same<decltype(utils::make_unique<char16_t>()), std::unique_ptr<char16_t>>::value, "utils::make_unique type must be correct");
+
+TEST_CASE("GeneralUtils::make_unique", "[make_unique]") {
+  const auto pstr = utils::make_unique<std::string>("test string");
+  REQUIRE("test string" == *pstr);
+}
+
+TEST_CASE("GeneralUtils::intdiv_ceil", "[intdiv_ceil]") {
+  REQUIRE(0 == utils::intdiv_ceil(0, 1));
+  REQUIRE(0 == utils::intdiv_ceil(0, 2));
+  REQUIRE(1 == utils::intdiv_ceil(1, 2));
+  REQUIRE(1 == utils::intdiv_ceil(1, 3));
+  REQUIRE(1 == utils::intdiv_ceil(3, 3));
+  REQUIRE(2 == utils::intdiv_ceil(4, 3));
+  REQUIRE(2 == utils::intdiv_ceil(4, 3));
+  REQUIRE(0 == utils::intdiv_ceil(-1, 3));
+  REQUIRE(-1 == utils::intdiv_ceil(-3, 3));
+  REQUIRE(-1 == utils::intdiv_ceil(-4, 3));
+}
+
+TEST_CASE("GeneralUtils::exchange", "[exchange]") {
+  int a = 1;
+  int b = 2;
+  a = utils::exchange(b, 0);
+  REQUIRE(2 == a);
+  REQUIRE(0 == b);
+}
+
+static_assert(std::is_same<decltype(utils::void_t<char16_t>()), void>::value, "utils::void_t single arg must work");
+static_assert(std::is_same<decltype(utils::void_t<int, double, bool, void, char16_t>()), void>::value, "utils::void_t multi arg must work");
+
+TEST_CASE("GeneralUtils::invoke pointer to member function", "[invoke memfnptr]") {
+  const int result{0xc1ca};
+  struct Tester {
+    bool called{};
+    int memfn(const int arg) {
+      REQUIRE(42 == arg);
+      called = true;
+      return result;
+    }
+  };
+
+  // normal
+  REQUIRE(result == utils::invoke(&Tester::memfn, Tester{}, 42));
+
+  // reference_wrapper
+  Tester t2;
+  const auto ref_wrapper = std::ref(t2);
+  REQUIRE(result == utils::invoke(&Tester::memfn, ref_wrapper, 42));
+  REQUIRE(t2.called);
+
+  // pointer
+  Tester t3;
+  REQUIRE(result == utils::invoke(&Tester::memfn, &t3, 42));
+  REQUIRE(t3.called);
+}
+
+TEST_CASE("GeneralUtils::invoke pointer to data member", "[invoke data member]") {
+  struct Times2 {
+    int value;
+    explicit Times2(const int i) :value{i * 2} {}
+  };
+
+  // normal
+  REQUIRE(24 == utils::invoke(&Times2::value, Times2{12}));
+
+  // reference_wrapper
+  Times2 t2{42};
+  const auto ref_wrapper = std::ref(t2);
+  REQUIRE(84 == utils::invoke(&Times2::value, ref_wrapper));
+
+  // pointer
+  Times2 t3{0};
+  REQUIRE(0 == utils::invoke(&Times2::value, &t3));
+}
+
+namespace {
+bool free_function(const bool b) { return b; }
+}  // namespace
+
+TEST_CASE("GeneralUtils::invoke FunctionObject", "[invoke function object]") {
+  REQUIRE(true == utils::invoke(&free_function, true));
+  REQUIRE(false == utils::invoke(&free_function, false));
+
+  const auto n = 3;
+  const auto int_timesn = [n](const int i) { return n * i; };
+
+  // lambda with capture
+  REQUIRE(60 == utils::invoke(int_timesn, 20));
+}
diff --git a/libminifi/test/unit/IntervalSwitchTest.cpp b/libminifi/test/unit/IntervalSwitchTest.cpp
new file mode 100644
index 0000000..5bd0e62
--- /dev/null
+++ b/libminifi/test/unit/IntervalSwitchTest.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "utils/IntervalSwitch.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+namespace {
+using State = utils::IntervalSwitchState;
+
+struct expected {
+  expected(State state, bool switched) :state{state}, switched{switched} {}
+
+  State state;
+  bool switched;
+};
+
+template<typename IntervalSwitchReturn>
+bool operator==(const IntervalSwitchReturn& lhs, const expected& c) {
+  return lhs.state == c.state && lhs.switched == c.switched;
+}
+}  // namespace
+
+TEST_CASE("basic IntervalSwitch", "[intervalswitch.basic]") {
+  utils::IntervalSwitch<int> interval_switch{100, 150};
+  REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(99) == expected(State::LOWER, true));
+  REQUIRE(interval_switch(-20022202) == expected(State::LOWER, false));
+  REQUIRE(interval_switch(120) == expected(State::LOWER, false));
+  REQUIRE(interval_switch(149) == expected(State::LOWER, false));
+  REQUIRE(interval_switch(150) == expected(State::UPPER, true));
+  REQUIRE(interval_switch(150) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(2000) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+}
+
+TEST_CASE("IntervalSwitch comparator", "[intervalswitch.comp]") {
+  utils::IntervalSwitch<uint32_t, std::greater<uint32_t>> interval_switch{250, 100};
+  REQUIRE(interval_switch(99) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(120) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(250) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(251) == expected(State::LOWER, true));
+  REQUIRE(interval_switch(150) == expected(State::LOWER, false));
+  REQUIRE(interval_switch(101) == expected(State::LOWER, false));
+  REQUIRE(interval_switch(100) == expected(State::UPPER, true));
+  REQUIRE(interval_switch(100) == expected(State::UPPER, false));
+  REQUIRE(interval_switch(250) == expected(State::UPPER, false));
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp b/libminifi/test/unit/OptionalTest.cpp
new file mode 100644
index 0000000..311afec
--- /dev/null
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "utils/OptionalUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+TEST_CASE("optional map", "[optional map]") {
+  const auto test1 = utils::make_optional(6) | utils::map([](const int i) { return i * 2; });
+  REQUIRE(12 == test1.value());
+
+  const auto test2 = utils::optional<int>{} | utils::map([](const int i) { return i * 2; });
+  REQUIRE(!test2.has_value());
+}
+
+TEST_CASE("optional flatMap", "[optional flat map]") {
+  const auto make_intdiv_noremainder = [](const int denom) {
+    return [denom](const int num) { return num % denom == 0 ? utils::make_optional(num / denom) : utils::optional<int>{}; };
+  };
+
+  const auto test1 = utils::make_optional(6) | utils::flatMap(make_intdiv_noremainder(3));
+  REQUIRE(2 == test1.value());
+
+  const auto const_lval_func = make_intdiv_noremainder(4);
+  const auto test2 = utils::optional<int>{} | utils::flatMap(const_lval_func);
+  REQUIRE(!test2.has_value());
+
+  auto mutable_lval_func = make_intdiv_noremainder(3);
+  const auto test3 = utils::make_optional(7) | utils::flatMap(mutable_lval_func);
+  REQUIRE(!test3.has_value());
+}
diff --git a/libminifi/test/unit/PathUtilsTests.cpp b/libminifi/test/unit/PathUtilsTests.cpp
index 30cbb78..b907e1f 100644
--- a/libminifi/test/unit/PathUtilsTests.cpp
+++ b/libminifi/test/unit/PathUtilsTests.cpp
@@ -19,14 +19,32 @@
 #include <catch.hpp>
 #include "utils/file/PathUtils.h"
 
-using namespace org::apache::nifi::minifi::utils::file;
+namespace fs = org::apache::nifi::minifi::utils::file;
 
-TEST_CASE("PathUtils::globToRegex works", "[globToRegex]") {
-  REQUIRE(PathUtils::globToRegex("") == "");
-  REQUIRE(PathUtils::globToRegex("NoSpecialChars") == "NoSpecialChars");
-  REQUIRE(PathUtils::globToRegex("ReplaceDot.txt") == "ReplaceDot\\.txt");
-  REQUIRE(PathUtils::globToRegex("Replace.Multiple.Dots...txt") == "Replace\\.Multiple\\.Dots\\.\\.\\.txt");
-  REQUIRE(PathUtils::globToRegex("ReplaceAsterisk.*") == "ReplaceAsterisk\\..*");
-  REQUIRE(PathUtils::globToRegex("Replace*Multiple*Asterisks") == "Replace.*Multiple.*Asterisks");
-  REQUIRE(PathUtils::globToRegex("ReplaceQuestionMark?.txt") == "ReplaceQuestionMark.\\.txt");
+TEST_CASE("file::globToRegex works", "[globToRegex]") {
+  REQUIRE(fs::globToRegex("").empty());
+  REQUIRE(fs::globToRegex("NoSpecialChars") == "NoSpecialChars");
+  REQUIRE(fs::globToRegex("ReplaceDot.txt") == "ReplaceDot\\.txt");
+  REQUIRE(fs::globToRegex("Replace.Multiple.Dots...txt") == "Replace\\.Multiple\\.Dots\\.\\.\\.txt");
+  REQUIRE(fs::globToRegex("ReplaceAsterisk.*") == "ReplaceAsterisk\\..*");
+  REQUIRE(fs::globToRegex("Replace*Multiple*Asterisks") == "Replace.*Multiple.*Asterisks");
+  REQUIRE(fs::globToRegex("ReplaceQuestionMark?.txt") == "ReplaceQuestionMark.\\.txt");
+}
+
+TEST_CASE("path::isAbsolutePath", "[path::isAbsolutePath]") {
+#ifdef WIN32
+  REQUIRE(fs::isAbsolutePath("C:\\"));
+  REQUIRE(fs::isAbsolutePath("C:\\Program Files"));
+  REQUIRE(fs::isAbsolutePath("C:\\Program Files\\ApacheNiFiMiNiFi\\nifi-minifi-cpp\\conf\\minifi.properties"));
+  REQUIRE(fs::isAbsolutePath("C:/"));
+  REQUIRE(fs::isAbsolutePath("C:/Program Files"));
+  REQUIRE(fs::isAbsolutePath("C:/Program Files/ApacheNiFiMiNiFi/nifi-minifi-cpp/conf/minifi.properties"));
+  REQUIRE(!fs::isAbsolutePath("/"));
+#else
+  REQUIRE(fs::isAbsolutePath("/"));
+  REQUIRE(fs::isAbsolutePath("/etc"));
+  REQUIRE(fs::isAbsolutePath("/opt/minifi/conf/minifi.properties"));
+#endif /* WIN32 */
+
+  REQUIRE(!fs::isAbsolutePath("hello"));
 }
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 3dba436..d90a37f 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -53,6 +53,7 @@
 #include "core/FlowConfiguration.h"
 #include "core/ConfigurationFactory.h"
 #include "core/RepositoryFactory.h"
+#include "DiskSpaceWatchdog.h"
 #include "utils/file/PathUtils.h"
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
@@ -142,7 +143,7 @@ int main(int argc, char **argv) {
   // initialize static functions that were defined apriori
   core::FlowConfiguration::initialize_static_functions();
 
-  std::string graceful_shutdown_seconds = "";
+  std::string graceful_shutdown_seconds;
   std::string prov_repo_class = "provenancerepository";
   std::string flow_repo_class = "flowfilerepository";
   std::string nifi_configuration_class_name = "yamlconfiguration";
@@ -204,7 +205,7 @@ int main(int argc, char **argv) {
   // Make a record of minifi home in the configured log file.
   logger->log_info("MINIFI_HOME=%s", minifiHome);
 
-  std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>();
+  const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>();
   configure->setHome(minifiHome);
   configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
 
@@ -286,8 +287,53 @@ int main(int argc, char **argv) {
 
   std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name);
 
-  std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>(
-    new minifi::FlowController(prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo));
+  const auto controller = std::make_shared<minifi::FlowController>(prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo);
+
+  const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v){ return v == "true"; })).value_or(true);
+  std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
+  if (disk_space_watchdog_enable) {
+    try {
+      const auto repo_paths = [&] {
+        std::vector<std::string> repo_paths;
+        repo_paths.reserve(3);
+        // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
+        const auto path_valid = [](const std::string& p) { return !p.empty() && p != REPOSITORY_DIRECTORY; };
+        auto prov_repo_path = prov_repo->getDirectory();
+        auto flow_repo_path = flow_repo->getDirectory();
+        auto content_repo_storage_path = content_repo->getStoragePath();
+        if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { repo_paths.push_back(std::move(prov_repo_path)); }
+        if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { repo_paths.push_back(std::move(flow_repo_path)); }
+        if (path_valid(content_repo_storage_path)) { repo_paths.push_back(std::move(content_repo_storage_path)); }
+        return repo_paths;
+      }();
+      const auto available_spaces = minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
+      const auto config = minifi::disk_space_watchdog::read_config(*configure);
+      const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
+        const auto it = std::min_element(std::begin(spaces), std::end(spaces));
+        return it != spaces.end() ? *it : (std::numeric_limits<std::uintmax_t>::max)();
+      };
+      if (min_space(available_spaces) <= config.stop_threshold_bytes) {
+        logger->log_error("Cannot start MiNiFi due to insufficient available disk space");
+        return -1;
+      }
+      auto interval_switch = minifi::disk_space_watchdog::disk_space_interval_switch(config);
+      disk_space_watchdog = utils::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, min_space, repo_paths, logger, &controller]() mutable {
+        const auto stop = [&]{ controller->stop(); controller->unload(); };
+        const auto restart = [&]{ controller->load(); controller->start(); };
+        const auto switch_state = interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get())));
+        if (switch_state.state == utils::IntervalSwitchState::LOWER && switch_state.switched) {
+          logger->log_warn("Stopping flow controller due to insufficient disk space");
+          stop();
+        } else if (switch_state.state == utils::IntervalSwitchState::UPPER && switch_state.switched) {
+          logger->log_info("Restarting flow controller");
+          restart();
+        }
+      });
+    } catch (const std::runtime_error& error) {
+      logger->log_error(error.what());
+      return -1;
+    }
+  }
 
   logger->log_info("Loading FlowController");
 
@@ -305,8 +351,10 @@ int main(int argc, char **argv) {
   }
 
   // Start Processing the flow
-
   controller->start();
+
+  if (disk_space_watchdog) { disk_space_watchdog->start(); }
+
   logger->log_info("MiNiFi started");
 
   /**
@@ -324,6 +372,8 @@ int main(int argc, char **argv) {
   while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR);
   if(ret_val == -1) perror("sem_unlink");
 
+  disk_space_watchdog = nullptr;
+
   /**
    * Trigger unload -- wait stop_wait_time
    */