You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/30 15:41:55 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1177 Add new matchesFullInput() function

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef4dd5  MINIFICPP-1177 Add new matchesFullInput() function
6ef4dd5 is described below

commit 6ef4dd5bab30fa7026b8da495d971d97765dadb5
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Thu May 14 19:36:44 2020 +0200

    MINIFICPP-1177 Add new matchesFullInput() function
    
    MINIFICPP-1177 Add file_size and computeChecksum
    
    MINIFICPP-1177 Fix log rotation handling in TailFile
    
    Instead of the dodgy logic of doing one file at a time in onTrigger
    and trying to keep track using the persisted state, we follow the same
    logic as NiFi now: only look at rolled files if the input file got
    truncated, only look at new rotated files matching the pattern, and
    stream all new content in one onTrigger call.
    
    Also fixed some bugs in Multiple file mode and Yield detection logic,
    as well as corrected some unit tests (when we checked the log output,
    we previously included output from earlier stages of the test).
    
    Support some previously unsupported NiFi properties:
    - Recursive lookup
    - Lookup frequency
    - Rolling Filename Pattern
    
    Change the default delimiter to \n (the previous default was to always
    read to the current end of the file, even if it is in the middle of a
    line).  Also include the delimiter in the flow file.  (As NiFi does.)
    
    MINIFICPP-1177 Make TailFile unit tests more robust
    
    These used to fail in a small percentage of cases due to timing issues.
    I have reordered some operations and added some sleeps, and the failures
    seem to be gone now.
    
    MINIFICPP-1177 Use ProcessSession::write() instead of import()
    
    MINIFICPP-1177 Make ProvenanceTestHelper thread-safe
    
    Also fix variable naming, remove C-style casts, change virtual to override,
    add missing overrides, and fix a bug in TestRepository::DeSerialize.
    
    MINIFICPP-1177 Log the state at startup
    
    Also remove some unused arguments and clean up some comments in the header.
    
    MINIFICPP-1177 Save last_read_time to the state manager, too
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #791
---
 .../tests/C2DescribeCoreComponentStateTest.cpp     |   12 +-
 .../standard-processors/processors/TailFile.cpp    |  813 ++++++++++-----
 .../standard-processors/processors/TailFile.h      |  142 ++-
 .../standard-processors/tests/CMakeLists.txt       |    8 +-
 .../tests/unit/TailFileTests.cpp                   | 1074 +++++++++++++++++---
 libminifi/include/core/ProcessSession.h            |    4 +-
 libminifi/include/core/Relationship.h              |    9 +
 libminifi/include/io/CRCStream.h                   |   14 +
 libminifi/include/utils/RegexUtils.h               |    2 +
 libminifi/include/utils/StringUtils.h              |    2 +
 libminifi/include/utils/file/FileUtils.h           |   21 +
 libminifi/include/utils/file/PathUtils.h           |    2 +
 libminifi/src/core/ProcessSession.cpp              |   24 +-
 libminifi/src/utils/RegexUtils.cpp                 |   10 +
 libminifi/src/utils/StringUtils.cpp                |   10 +
 .../PathUtils.h => src/utils/file/FileUtils.cpp}   |   47 +-
 libminifi/src/utils/file/PathUtils.cpp             |    8 +-
 libminifi/test/TestBase.cpp                        |    5 +
 libminifi/test/TestBase.h                          |    2 +
 libminifi/test/resources/TestTailFile.yml          |    1 +
 libminifi/test/resources/TestTailFileCron.yml      |    1 +
 libminifi/test/unit/CRCTests.cpp                   |   37 +
 libminifi/test/unit/FileUtilsTests.cpp             |  173 ++++
 libminifi/test/unit/PathUtilsTests.cpp             |   32 +
 libminifi/test/unit/ProcessSessionTests.cpp        |   76 ++
 libminifi/test/unit/ProvenanceTestHelper.h         |  148 +--
 libminifi/test/unit/RegexUtilsTests.cpp            |   10 +
 libminifi/test/unit/RelationshipTests.cpp          |   40 +
 libminifi/test/unit/StringUtilsTests.cpp           |   27 +
 29 files changed, 2231 insertions(+), 523 deletions(-)

diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
index b0e7d7d..9bd7573 100644
--- a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -35,11 +35,11 @@ class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
     test_file_1_ = utils::file::FileUtils::concat_path(temp_dir_, "test1.txt");
     test_file_2_ = utils::file::FileUtils::concat_path(temp_dir_, "test2.txt");
 
-    std::ofstream f1(test_file_1_);
-    f1 << "foo";
+    std::ofstream f1(test_file_1_, std::ios::out | std::ios::binary);
+    f1 << "foo\n";
 
-    std::ofstream f2(test_file_2_);
-    f2 << "foobar";
+    std::ofstream f2(test_file_2_, std::ios::out | std::ios::binary);
+    f2 << "foobar\n";
   }
 
  protected:
@@ -81,8 +81,8 @@ class DescribeCoreComponentStateHandler: public HeartbeatHandler {
       assert(strlen(tf["file.0.current"].GetString()) > 0U);
     };
 
-    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", "test1.txt", "3");
-    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", "test2.txt", "6");
+    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", "test1.txt", "4");
+    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", "test2.txt", "7");
   }
 };
 
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index d7c8d05..426667e 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -17,83 +17,301 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <stdio.h>
-
-#include <limits.h>
-#ifndef WIN32
-#include <dirent.h>
-#include <unistd.h>
-#endif
 
 #include <algorithm>
+#include <cinttypes>
+#include <cstdint>
 #include <iostream>
-#include <queue>
+#include <limits>
 #include <map>
 #include <unordered_map>
 #include <memory>
 #include <set>
-#include <sstream>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include "io/CRCStream.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/RegexUtils.h"
-#ifdef HAVE_REGEX_CPP
-#include <regex>
-#else
-#include <regex.h>
-#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-#ifndef S_ISDIR
-#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
-#endif
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for state migration from the legacy state file.",
-                                   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
-                                   "from the incoming file."
-                                   "If none is specified, data will be ingested as it becomes available.",
-                                   "");
+core::Property TailFile::FileName(
+    core::PropertyBuilder::createProperty("File to Tail")
+        ->withDescription("Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode")
+        ->isRequired(true)
+        ->build());
+
+core::Property TailFile::StateFile(
+    core::PropertyBuilder::createProperty("State File")
+        ->withDescription("DEPRECATED. Only use it for state migration from the legacy state file.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("TailFileState")
+        ->build());
+
+core::Property TailFile::Delimiter(
+    core::PropertyBuilder::createProperty("Input Delimiter")
+        ->withDescription("Specifies the character that should be used for delimiting the data being tailed"
+         "from the incoming file. If none is specified, data will be ingested as it becomes available.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("\\n")
+        ->build());
 
 core::Property TailFile::TailMode(
-    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")->withDescription(
-        "Specifies the tail file mode. In 'Single file' mode only a single file will be watched. "
+    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+        ->withDescription("Specifies the tail file mode. In 'Single file' mode only a single file will be watched. "
         "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. "
         "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true)
-        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")->build());
-
-core::Property TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")->isRequired(false)->build());
+        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+        ->build());
+
+core::Property TailFile::BaseDirectory(
+    core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")
+        ->withDescription("Base directory used to look for files to tail. This property is required when using Multiple file mode.")
+        ->isRequired(false)
+        ->build());
+
+core::Property TailFile::RecursiveLookup(
+    core::PropertyBuilder::createProperty("Recursive lookup")
+        ->withDescription("When using Multiple file mode, this property determines whether files are tailed in "
+        "child directories of the Base Directory or not.")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(false)
+        ->build());
+
+core::Property TailFile::LookupFrequency(
+    core::PropertyBuilder::createProperty("Lookup frequency")
+        ->withDescription("When using Multiple file mode, this property specifies the minimum duration "
+        "the processor will wait between looking for new files to tail in the Base Directory.")
+        ->isRequired(false)
+        ->withDefaultValue<core::TimePeriodValue>("10 min")
+        ->build());
+
+core::Property TailFile::RollingFilenamePattern(
+    core::PropertyBuilder::createProperty("Rolling Filename Pattern")
+        ->withDescription("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to "
+        "identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. "
+        "This pattern supports the wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file "
+        "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("${filename}.*")
+        ->build());
 
 core::Relationship TailFile::Success("success", "All files are routed to success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
 const char *TailFile::POSITION_STR = "POSITION.";
 
+namespace {
+template<typename Container, typename Key>
+bool containsKey(const Container &container, const Key &key) {
+  return container.find(key) != container.end();
+}
+
+template <typename Container, typename Key>
+int64_t readOptionalInt64(const Container &container, const Key &key) {
+  const auto it = container.find(key);
+  if (it != container.end()) {
+    return std::stoll(it->second);
+  } else {
+    return 0;
+  }
+}
+
+template <typename Container, typename Key>
+uint64_t readOptionalUint64(const Container &container, const Key &key) {
+  const auto it = container.find(key);
+  if (it != container.end()) {
+    return std::stoull(it->second);
+  } else {
+    return 0;
+  }
+}
+
+// the delimiter is the first character of the input, allowing some escape sequences
+std::string parseDelimiter(const std::string &input) {
+  if (input.empty()) return "";
+  if (input[0] != '\\') return std::string{ input[0] };
+  if (input.size() == std::size_t{1}) return "\\";
+  switch (input[1]) {
+    case 'r': return "\r";
+    case 't': return "\t";
+    case 'n': return "\n";
+    default: return std::string{ input[1] };
+  }
+}
+
+std::map<std::string, TailState> update_keys_in_legacy_states(const std::map<std::string, TailState> &legacy_tail_states) {
+  std::map<std::string, TailState> new_tail_states;
+  for (const auto &key_value_pair : legacy_tail_states) {
+    const TailState &state = key_value_pair.second;
+    std::string full_file_name = utils::file::FileUtils::concat_path(state.path_, state.file_name_);
+    new_tail_states.emplace(full_file_name, state);
+  }
+  return new_tail_states;
+}
+
+struct TailStateWithMtime {
+  using TimePoint = std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>;
+
+  TailStateWithMtime(TailState tail_state, TimePoint mtime)
+    : tail_state_(std::move(tail_state)), mtime_(mtime) {}
+
+  TailState tail_state_;
+  TimePoint mtime_;
+};
+
+void openFile(const std::string &file_name, uint64_t offset, std::ifstream &input_stream, const std::shared_ptr<logging::Logger> &logger) {
+  logger->log_debug("Opening %s", file_name);
+  input_stream.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
+  if (!input_stream.is_open() || !input_stream.good()) {
+    input_stream.close();
+    throw Exception(FILE_OPERATION_EXCEPTION, "Could not open file: " + file_name);
+  }
+  if (offset != 0U) {
+    input_stream.seekg(offset, std::ifstream::beg);
+    if (!input_stream.good()) {
+      logger->log_error("Seeking to %" PRIu64 " failed for file %s (does file/filesystem support seeking?)", offset, file_name);
+      throw Exception(FILE_OPERATION_EXCEPTION, "Could not seek file " + file_name + " to offset " + std::to_string(offset));
+    }
+  }
+}
+
+constexpr std::size_t BUFFER_SIZE = 4096;
+
+class FileReaderCallback : public OutputStreamCallback {
+ public:
+  FileReaderCallback(const std::string &file_name,
+                     uint64_t offset,
+                     char input_delimiter,
+                     uint64_t checksum)
+    : input_delimiter_(input_delimiter),
+      checksum_(checksum),
+      logger_(logging::LoggerFactory<TailFile>::getLogger()) {
+    openFile(file_name, offset, input_stream_, logger_);
+  }
+
+  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+    io::CRCStream<io::BaseStream> crc_stream{output_stream.get(), checksum_};
+
+    uint64_t num_bytes_written = 0;
+    bool found_delimiter = false;
+
+    while (hasMoreToRead() && !found_delimiter) {
+      if (begin_ == end_) {
+        input_stream_.read(reinterpret_cast<char *>(buffer_.data()), buffer_.size());
+
+        const auto num_bytes_read = input_stream_.gcount();
+        logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
+
+        begin_ = buffer_.data();
+        end_ = begin_ + num_bytes_read;
+      }
+
+      char *delimiter_pos = std::find(begin_, end_, input_delimiter_);
+      found_delimiter = (delimiter_pos != end_);
+
+      ptrdiff_t zlen{std::distance(begin_, delimiter_pos)};
+      if (found_delimiter) {
+        zlen += 1;
+      }
+      const int len = gsl::narrow<int>(zlen);
+
+      crc_stream.write(reinterpret_cast<uint8_t*>(begin_), len);
+      num_bytes_written += len;
+      begin_ += len;
+    }
+
+    if (found_delimiter) {
+      checksum_ = crc_stream.getCRC();
+    } else {
+      latest_flow_file_ends_with_delimiter_ = false;
+    }
+
+    return num_bytes_written;
+  }
+
+  uint64_t checksum() const {
+    return checksum_;
+  }
+
+  bool hasMoreToRead() const {
+    return begin_ != end_ || input_stream_.good();
+  }
+
+  bool useLatestFlowFile() const {
+    return latest_flow_file_ends_with_delimiter_;
+  }
+
+ private:
+  char input_delimiter_;
+  uint64_t checksum_;
+  std::ifstream input_stream_;
+  std::shared_ptr<logging::Logger> logger_;
+
+  std::array<char, BUFFER_SIZE> buffer_;
+  char *begin_ = buffer_.data();
+  char *end_ = buffer_.data();
+
+  bool latest_flow_file_ends_with_delimiter_ = true;
+};
+
+class WholeFileReaderCallback : public OutputStreamCallback {
+ public:
+  WholeFileReaderCallback(const std::string &file_name,
+                          uint64_t offset,
+                          uint64_t checksum)
+    : checksum_(checksum),
+      logger_(logging::LoggerFactory<TailFile>::getLogger()) {
+    openFile(file_name, offset, input_stream_, logger_);
+  }
+
+  uint64_t checksum() const {
+    return checksum_;
+  }
+
+  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+    std::array<char, BUFFER_SIZE> buffer;
+
+    io::CRCStream<io::BaseStream> crc_stream{output_stream.get(), checksum_};
+
+    uint64_t num_bytes_written = 0;
+
+    while (input_stream_.good()) {
+      input_stream_.read(buffer.data(), buffer.size());
+
+      const auto num_bytes_read = input_stream_.gcount();
+      logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
+
+      const int len = gsl::narrow<int>(num_bytes_read);
+
+      crc_stream.write(reinterpret_cast<uint8_t*>(buffer.data()), len);
+      num_bytes_written += len;
+    }
+
+    checksum_ = crc_stream.getCRC();
+
+    return num_bytes_written;
+  }
+
+ private:
+  uint64_t checksum_;
+  std::ifstream input_stream_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+}  // namespace
+
 void TailFile::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
@@ -102,6 +320,9 @@ void TailFile::initialize() {
   properties.insert(Delimiter);
   properties.insert(TailMode);
   properties.insert(BaseDirectory);
+  properties.insert(RecursiveLookup);
+  properties.insert(LookupFrequency);
+  properties.insert(RollingFilenamePattern);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -112,9 +333,7 @@ void TailFile::initialize() {
 void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
 
-  // can perform these in notifyStop, but this has the same outcome
   tail_states_.clear();
-  state_recovered_ = false;
 
   state_manager_ = context->getStateManager();
   if (state_manager_ == nullptr) {
@@ -124,56 +343,51 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
-    delimiter_ = value;
+    delimiter_ = parseDelimiter(value);
   }
 
+  context->getProperty(FileName.getName(), file_to_tail_);
+
   std::string mode;
   context->getProperty(TailMode.getName(), mode);
 
-  std::string file = "";
-  if (!context->getProperty(FileName.getName(), file)) {
-    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail is a required property");
-  }
   if (mode == "Multiple file") {
-    // file is a regex
-    std::string base_dir;
-    if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+    tail_mode_ = Mode::MULTIPLE;
+
+    if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode.");
     }
 
-    auto fileRegexSelect = [&](const std::string& path, const std::string& filename) -> bool {
-      if (acceptFile(file, filename)) {
-        tail_states_.insert(std::make_pair(filename, TailState {path, filename, 0, 0}));
-      }
-      return true;
-    };
+    if (utils::file::FileUtils::is_directory(base_dir_.c_str()) == 0) {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory does not exist or is not a directory");
+    }
+
+    context->getProperty(RecursiveLookup.getName(), recursive_lookup_);
 
-    utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, false);
+    context->getProperty(LookupFrequency.getName(), lookup_frequency_);
+
+    // in multiple mode, we check for new/removed files in every onTrigger
 
   } else {
-    std::string fileLocation, fileName;
-    if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, fileName)) {
-      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
+    tail_mode_ = Mode::SINGLE;
+
+    std::string path, file_name;
+    if (utils::file::PathUtils::getFileNameAndPath(file_to_tail_, path, file_name)) {
+      // NOTE: position and checksum will be updated in recoverState() if there is a persisted state for this file
+      tail_states_.emplace(file_to_tail_, TailState{path, file_name});
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
     }
   }
-}
-
-bool TailFile::acceptFile(const std::string &fileFilter, const std::string &file) {
-  utils::Regex rgx(fileFilter);
-  return rgx.match(file);
-}
 
-std::string TailFile::trimLeft(const std::string& s) {
-  return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s);
-}
+  std::string rolling_filename_pattern_glob;
+  context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
+  rolling_filename_pattern_ = utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
 
-std::string TailFile::trimRight(const std::string& s) {
-  return org::apache::nifi::minifi::utils::StringUtils::trimRight(s);
+  recoverState(context);
 }
 
-void TailFile::parseStateFileLine(char *buf) {
+void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
   char *line = buf;
 
   logger_->log_trace("Received line %s", buf);
@@ -187,7 +401,7 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   char *equal = strchr(line, '=');
-  if (equal == NULL) {
+  if (equal == nullptr) {
     return;
   }
 
@@ -204,33 +418,33 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
       logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
-      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
+      state.emplace(fileName, TailState{fileLocation, fileName});
     } else {
-      tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 }));
+      state.emplace(value, TailState{fileLocation, value});
     }
   }
   if (key == "POSITION") {
     // for backwards compatibility
-    if (tail_states_.size() != 1) {
+    if (tail_states_.size() != std::size_t{1}) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
     }
     const auto position = std::stoull(value);
-    logger_->log_debug("Received position %d", position);
-    tail_states_.begin()->second.currentTailFilePosition_ = position;
+    logger_->log_debug("Received position %llu", position);
+    state.begin()->second.position_ = gsl::narrow<uint64_t>(position);
   }
   if (key.find(CURRENT_STR) == 0) {
     const auto file = key.substr(strlen(CURRENT_STR));
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
-      tail_states_[file].path_ = fileLocation;
-      tail_states_[file].current_file_name_ = fileName;
+      state[file].path_ = fileLocation;
+      state[file].file_name_ = fileName;
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
     }
@@ -238,18 +452,42 @@ void TailFile::parseStateFileLine(char *buf) {
 
   if (key.find(POSITION_STR) == 0) {
     const auto file = key.substr(strlen(POSITION_STR));
-    tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+    state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
+  std::map<std::string, TailState> new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+                            getStateFromLegacyStateFile(context, new_tail_states);
+  if (!state_load_success) {
+    return false;
+  }
 
+  if (tail_mode_ == Mode::SINGLE) {
+    if (tail_states_.size() == 1) {
+      auto state_it = tail_states_.begin();
+      const auto it = new_tail_states.find(state_it->first);
+      if (it != new_tail_states.end()) {
+        state_it->second = it->second;
+      }
+    } else {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "This should never happen: "
+          "in Single file mode, internal state size should be 1, but it is " + std::to_string(tail_states_.size()));
+    }
+  } else {
+    tail_states_ = std::move(new_tail_states);
+  }
 
-bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
-  bool state_load_success = false;
+  logState();
+  storeState();
+
+  return true;
+}
 
+bool TailFile::getStateFromStateManager(std::map<std::string, TailState> &new_tail_states) const {
   std::unordered_map<std::string, std::string> state_map;
   if (state_manager_->get(state_map)) {
-    std::map<std::string, TailState> new_tail_states;
     for (size_t i = 0U;; i++) {
       std::string name;
       try {
@@ -260,72 +498,79 @@ bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context
       try {
         const std::string& current = state_map.at("file." + std::to_string(i) + ".current");
         uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position"));
+        uint64_t checksum = readOptionalUint64(state_map, "file." + std::to_string(i) + ".checksum");
+        std::chrono::system_clock::time_point last_read_time{std::chrono::milliseconds{
+            readOptionalInt64(state_map, "file." + std::to_string(i) + ".last_read_time")
+        }};
 
         std::string fileLocation, fileName;
         if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) {
           logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
-          new_tail_states.emplace(fileName, TailState { fileLocation, fileName, position, 0 });
+          new_tail_states.emplace(current, TailState{fileLocation, fileName, position, last_read_time, checksum});
         } else {
-          new_tail_states.emplace(current, TailState { fileLocation, current, position, 0 });
+          new_tail_states.emplace(current, TailState{fileLocation, current, position, last_read_time, checksum});
         }
       } catch (...) {
         continue;
       }
     }
-    state_load_success = true;
-    tail_states_ = std::move(new_tail_states);
     for (const auto& s : tail_states_) {
-      logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, s.second.currentTailFileModificationTime_);
+      logger_->log_debug("TailState %s: %s, %s, %" PRIu64 ", %" PRIu64,
+                         s.first, s.second.path_, s.second.file_name_, s.second.position_, s.second.checksum_);
     }
+    return true;
   } else {
     logger_->log_info("Found no stored state");
   }
+  return false;
+}
 
-  /* We could not get the state from the StateManager, try to migrate the old state file if it exists */
-  if (!state_load_success) {
-    std::ifstream file(state_file_.c_str(), std::ifstream::in);
-    if (!file.good()) {
-      logger_->log_error("load state file failed %s", state_file_);
-      return false;
-    }
-    tail_states_.clear();
-    char buf[BUFFER_SIZE];
-    for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
-      parseStateFileLine(buf);
-    }
-  }
+bool TailFile::getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
+                                           std::map<std::string, TailState> &new_tail_states) const {
+  std::string state_file_name_property;
+  context->getProperty(StateFile.getName(), state_file_name_property);
+  std::string state_file = state_file_name_property + "." + getUUIDStr();
 
-  /**
-   * recover times and validate that we have paths
-   */
+  std::ifstream file(state_file.c_str(), std::ifstream::in);
+  if (!file.good()) {
+    logger_->log_info("Legacy state file %s not found (this is OK)", state_file);
+    return false;
+  }
 
-  for (auto &state : tail_states_) {
-    std::string fileLocation, fileName;
-    if (!utils::file::PathUtils::getFileNameAndPath(state.second.current_file_name_, fileLocation, fileName) && state.second.path_.empty()) {
-      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file does not contain a full path and file name");
-    }
-    struct stat sb;
-    const auto fileFullName = state.second.path_ + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    if (stat(fileFullName.c_str(), &sb) == 0) {
-      state.second.currentTailFileModificationTime_ = ((uint64_t) (sb.st_mtime) * 1000);
-    }
+  std::map<std::string, TailState> legacy_tail_states;
+  char buf[BUFFER_SIZE];
+  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
+    parseStateFileLine(buf, legacy_tail_states);
   }
 
-  logger_->log_debug("load state succeeded");
+  new_tail_states = update_keys_in_legacy_states(legacy_tail_states);
+  return true;
+}
 
-  /* Save the state to the state manager */
-  storeState(context);
+void TailFile::logState() {
+  logger_->log_info("State of the TailFile processor %s:", name_);
+  for (const auto& key_value_pair : tail_states_) {
+    logging::LOG_INFO(logger_) << key_value_pair.first << " => { " << key_value_pair.second << " }";
+  }
+}
 
-  return true;
+std::ostream& operator<<(std::ostream &os, const TailState &tail_state) {
+  os << "name: " << tail_state.file_name_
+      << ", position: " << tail_state.position_
+      << ", checksum: " << tail_state.checksum_
+      << ", last_read_time: " << tail_state.lastReadTimeInMilliseconds();
+  return os;
 }
 
-bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context) {
+bool TailFile::storeState() {
   std::unordered_map<std::string, std::string> state;
   size_t i = 0;
   for (const auto& tail_state : tail_states_) {
-    state["file." + std::to_string(i) + ".name"] = tail_state.first;
-    state["file." + std::to_string(i) + ".current"] = utils::file::FileUtils::concat_path(tail_state.second.path_, tail_state.second.current_file_name_);
-    state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.currentTailFilePosition_);
+    state["file." + std::to_string(i) + ".current"] = tail_state.first;
+    state["file." + std::to_string(i) + ".name"] = tail_state.second.file_name_;
+    state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.position_);
+    state["file." + std::to_string(i) + ".checksum"] = std::to_string(tail_state.second.checksum_);
+    state["file." + std::to_string(i) + ".last_read_time"] = std::to_string(tail_state.second.lastReadTimeInMilliseconds());
     ++i;
   }
   if (!state_manager_->set(state)) {
@@ -335,170 +580,206 @@ bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name) {
-  struct stat statbuf;
-  std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_;
-
-  if (stat(fullPath.c_str(), &statbuf) == 0) {
-    logger_->log_trace("Searching for files rolled over");
-    std::string pattern = file.current_file_name_;
-    std::size_t found = file.current_file_name_.find_last_of(".");
-    if (found != std::string::npos)
-      pattern = file.current_file_name_.substr(0, found);
-
-    // Callback, called for each file entry in the listed directory
-    // Return value is used to break (false) or continue (true) listing
-    auto lambda = [&](const std::string& path, const std::string& filename) -> bool {
-      struct stat sb;
-      std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename;
-      if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) {
-        uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-        if (candidateModTime >= file.currentTailFileModificationTime_) {
-          logging::LOG_TRACE(logger_) << "File " << filename << " (short name " << file.current_file_name_ <<
-          ") disk mod time " << candidateModTime << ", struct mod time " << file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", position " << file.currentTailFilePosition_;
-          if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ &&
-              sb.st_size == file.currentTailFilePosition_) {
-            return true;  // Skip the current file as a candidate in case it wasn't updated
+std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) const {
+  logger_->log_debug("Searching for files rolled over; last read time is %" PRId64, state.lastReadTimeInMilliseconds());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", base_name);
+
+  std::vector<TailStateWithMtime> matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string &path, const std::string &file_name) -> bool {
+    if (file_name != state.file_name_ && utils::Regex::matchesFullInput(pattern, file_name)) {
+      std::string full_file_name = path + utils::file::FileUtils::get_separator() + file_name;
+      TailStateWithMtime::TimePoint mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+      logger_->log_debug("File %s with mtime %" PRId64 " matches rolling filename pattern %s", file_name, int64_t{mtime.time_since_epoch().count()}, pattern);
+      if (mtime >= std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
+        logger_->log_debug("File %s has mtime >= last read time, so we are going to read it", file_name);
+        matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime);
+      }
+    }
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), [](const TailStateWithMtime &left, const TailStateWithMtime &right) {
+    return std::tie(left.mtime_, left.tail_state_.file_name_) <
+           std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+    TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
+    std::string full_file_name = first_rotated_file.fileNameWithPath();
+    if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+      uint64_t checksum = utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+      if (checksum == state.checksum_) {
+        first_rotated_file.position_ = state.position_;
+        first_rotated_file.checksum_ = state.checksum_;
       }
-      TailMatchedFileItem item;
-      item.fileName = filename;
-      item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-      matchedFiles.push_back(item);
     }
   }
-  return true;};
-
-    utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-    if (matchedFiles.size() < 1) {
-      logger_->log_debug("No newer files found in directory!");
-      return;
-    }
+  std::vector<TailState> matched_files;
+  matched_files.reserve(matched_files_with_mtime.size());
+  std::transform(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), std::back_inserter(matched_files),
+                 [](TailStateWithMtime &tail_state_with_mtime) { return std::move(tail_state_with_mtime.tail_state_); });
+  return matched_files;
+}
 
-    // Sort the list based on modified time
-    std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
-    TailMatchedFileItem item = matchedFiles[0];
-    logger_->log_info("TailFile File Roll Over from %s to %s", file.current_file_name_, item.fileName);
+void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession> &session) {
+  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
 
-    // Going ahead in the file rolled over
-    if (file.current_file_name_ != base_file_name) {
-      logger_->log_debug("Resetting posotion since %s != %s", base_file_name, file.current_file_name_);
-      file.currentTailFilePosition_ = 0;
+  if (tail_mode_ == Mode::MULTIPLE) {
+    if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) {
+      logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()});
+      checkForRemovedFiles();
+      checkForNewFiles();
+      last_multifile_lookup_ = std::chrono::steady_clock::now();
+    } else {
+      logger_->log_trace("Skipping multifile lookup");
     }
+  }
 
-    file.current_file_name_ = item.fileName;
+  // iterate over file states. may modify them
+  for (auto &state : tail_states_) {
+    processFile(session, state.first, state.second);
+  }
 
-    storeState(context);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
   }
 }
 
-void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-  std::string st_file;
-  if (context->getProperty(StateFile.getName(), st_file)) {
-    state_file_ = st_file + "." + getUUIDStr();
+void TailFile::processFile(const std::shared_ptr<core::ProcessSession> &session,
+                           const std::string &full_file_name,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(full_file_name) < state.position_) {
+    processRotatedFiles(session, state);
   }
-  if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
-    this->recoverState(context);
+
+  processSingleFile(session, full_file_name, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(session, file_state.fileNameWithPath(), file_state);
+    }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &session,
+                                 const std::string &full_file_name,
+                                 TailState &state) {
+  std::string fileName = state.file_name_;
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size zero", full_file_name);
+    return;
   }
+  logger_->log_debug("Tailing file %s from %" PRIu64, full_file_name, state.position_);
 
-  /**
-   * iterate over file states. may modify them
-   */
-  for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) {
-        logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, ffr->getSize());
-          std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::size_t num_flow_files = 0;
+    FileReaderCallback file_reader{full_file_name, state.position_, delim, state.checksum_};
+    TailState state_copy{state};
+
+    while (file_reader.hasMoreToRead()) {
+      auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+      session->write(flow_file, &file_reader);
+
+      if (file_reader.useLatestFlowFile()) {
+        updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
+        session->transfer(flow_file, Success);
+        updateStateAttributes(state_copy, flow_file->getSize(), file_reader.checksum());
+
+        ++num_flow_files;
 
       } else {
-        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, flowFile->getSize());
-          std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + "."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
+        session->remove(flow_file);
       }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
     }
+
+    state = state_copy;
+    storeState();
+
+    logger_->log_info("%zu flowfiles were received from TailFile input", num_flow_files);
+
+  } else {
+    WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_};
+    auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+    session->write(flow_file, &file_reader);
+
+    updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
+    session->transfer(flow_file, Success);
+    updateStateAttributes(state, flow_file->getSize(), file_reader.checksum());
+
+    storeState();
+  }
+}
+
+void TailFile::updateFlowFileAttributes(const std::string &full_file_name, const TailState &state,
+                                        const std::string &fileName, const std::string &baseName,
+                                        const std::string &extension,
+                                        std::shared_ptr<FlowFileRecord> &flow_file) const {
+  logger_->log_info("TailFile %s for %" PRIu64 " bytes", fileName, flow_file->getSize());
+  std::string logName = baseName + "." + std::to_string(state.position_) + "-" +
+                        std::to_string(state.position_ + flow_file->getSize() - 1) + "." + extension;
+  flow_file->updateKeyedAttribute(PATH, state.path_);
+  flow_file->addKeyedAttribute(ABSOLUTE_PATH, full_file_name);
+  flow_file->updateKeyedAttribute(FILENAME, logName);
+}
+
+void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const {
+  state.position_ += size;
+  state.last_read_time_ = std::chrono::system_clock::now();
+  state.checksum_ = checksum;
+}
+
+void TailFile::checkForRemovedFiles() {
+  std::vector<std::string> file_names_to_remove;
+
+  for (const auto &kv : tail_states_) {
+    const std::string &full_file_name = kv.first;
+    const TailState &state = kv.second;
+    if (utils::file::FileUtils::file_size(state.fileNameWithPath()) == 0u ||
+        !utils::Regex::matchesFullInput(file_to_tail_, state.file_name_)) {
+      file_names_to_remove.push_back(full_file_name);
+    }
+  }
+
+  for (const auto &full_file_name : file_names_to_remove) {
+    tail_states_.erase(full_file_name);
   }
 }
 
+void TailFile::checkForNewFiles() {
+  auto add_new_files_callback = [&](const std::string &path, const std::string &file_name) -> bool {
+    std::string full_file_name = path + utils::file::FileUtils::get_separator() + file_name;
+    if (!containsKey(tail_states_, full_file_name) && utils::Regex::matchesFullInput(file_to_tail_, file_name)) {
+      tail_states_.emplace(full_file_name, TailState{path, file_name});
+    }
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_);
+}
+
+std::chrono::milliseconds TailFile::getLookupFrequency() const {
+  return lookup_frequency_;
+}
+
 }  // namespace processors
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
-#endif
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 958d2d5..080505c 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -22,7 +22,9 @@
 
 #include <map>
 #include <memory>
+#include <utility>
 #include <string>
+#include <vector>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -37,37 +39,47 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-// TailFile Class
+struct TailState {
+  TailState(std::string path, std::string file_name, uint64_t position,
+            std::chrono::system_clock::time_point last_read_time,
+            uint64_t checksum)
+      : path_(std::move(path)), file_name_(std::move(file_name)), position_(position), last_read_time_(last_read_time), checksum_(checksum) {}
 
+  TailState(std::string path, std::string file_name)
+      : TailState{std::move(path), std::move(file_name), 0, std::chrono::system_clock::time_point{}, 0} {}
 
-typedef struct {
-  std::string path_;
-  std::string current_file_name_;
-  uint64_t currentTailFilePosition_;
-  uint64_t currentTailFileModificationTime_;
-} TailState;
+  TailState() = default;
+
+  std::string fileNameWithPath() const {
+    return path_ + utils::file::FileUtils::get_separator() + file_name_;
+  }
 
-// Matched File Item for Roll over check
-typedef struct {
-  std::string fileName;
-  uint64_t modifiedTime;
-} TailMatchedFileItem;
+  int64_t lastReadTimeInMilliseconds() const {
+    return std::chrono::duration_cast<std::chrono::milliseconds>(last_read_time_.time_since_epoch()).count();
+  }
 
+  std::string path_;
+  std::string file_name_;
+  uint64_t position_ = 0;
+  std::chrono::system_clock::time_point last_read_time_;
+  uint64_t checksum_ = 0;
+};
 
+std::ostream& operator<<(std::ostream &os, const TailState &tail_state);
+
+enum class Mode {
+  SINGLE, MULTIPLE, UNDEFINED
+};
 
 class TailFile : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
   explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
-      : core::Processor(name, uuid),
+      : core::Processor(std::move(name), uuid),
         logger_(logging::LoggerFactory<TailFile>::getLogger()) {
-    state_recovered_ = false;
   }
-  // Destructor
-  virtual ~TailFile() = default;
+
+  ~TailFile() override = default;
+
   // Processor Name
   static constexpr char const* ProcessorName = "TailFile";
   // Supported Properties
@@ -76,53 +88,95 @@ class TailFile : public core::Processor {
   static core::Property Delimiter;
   static core::Property TailMode;
   static core::Property BaseDirectory;
+  static core::Property RecursiveLookup;
+  static core::Property LookupFrequency;
+  static core::Property RollingFilenamePattern;
   // Supported Relationships
   static core::Relationship Success;
 
- public:
-  bool acceptFile(const std::string &filter, const std::string &file);
   /**
    * Function that's executed when the processor is scheduled.
-   * @param context process context.
-   * @param sessionFactory process session factory that is used when creating
-   * ProcessSession objects.
+   * @param context process context, provides eg. configuration.
+   * @param sessionFactory process session factory that is used when creating ProcessSession objects.
    */
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  // OnTrigger method, implemented by NiFi TailFile
+
+  /**
+   * Function that's executed on each invocation of the processor.
+   * @param context process context, provides eg. configuration.
+   * @param session session object, provides eg. ways to interact with flow files.
+   */
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>  &session) override;
-  // Initialize, over write by NiFi TailFile
-  void initialize(void) override;
-  // recoverState
+
+  void initialize() override;
+
   bool recoverState(const std::shared_ptr<core::ProcessContext>& context);
-  // storeState
-  bool storeState(const std::shared_ptr<core::ProcessContext>& context);
+
+  void logState();
+
+  bool storeState();
+
+  std::chrono::milliseconds getLookupFrequency() const;
 
  private:
   static const char *CURRENT_STR;
   static const char *POSITION_STR;
   std::mutex tail_file_mutex_;
-  // File to save state
-  std::string state_file_;
+
   // Delimiter for the data incoming from the tailed file.
   std::string delimiter_;
+
   // StateManager
   std::shared_ptr<core::CoreComponentStateManager> state_manager_;
-  // determine if state is recovered;
-  bool state_recovered_;
 
   std::map<std::string, TailState> tail_states_;
 
   static const int BUFFER_SIZE = 512;
 
-  // Utils functions for parse state file
-  std::string trimLeft(const std::string& s);
-  std::string trimRight(const std::string& s);
-  void parseStateFileLine(char *buf);
-  /**
-   * Check roll over for the provided file.
-   */
-  void checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name);
+  Mode tail_mode_ = Mode::UNDEFINED;
+
+  std::string file_to_tail_;
+
+  std::string base_dir_;
+
+  bool recursive_lookup_ = false;
+
+  std::chrono::milliseconds lookup_frequency_;
+
+  std::chrono::steady_clock::time_point last_multifile_lookup_;
+
+  std::string rolling_filename_pattern_;
+
   std::shared_ptr<logging::Logger> logger_;
+
+  void parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const;
+
+  void processRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state);
+
+  std::vector<TailState> findRotatedFiles(const TailState &state) const;
+
+  void processFile(const std::shared_ptr<core::ProcessSession> &session,
+                   const std::string &full_file_name,
+                   TailState &state);
+
+  void processSingleFile(const std::shared_ptr<core::ProcessSession> &session,
+                         const std::string &full_file_name,
+                         TailState &state);
+
+  bool getStateFromStateManager(std::map<std::string, TailState> &state) const;
+
+  bool getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
+                                   std::map<std::string, TailState> &new_tail_states) const;
+
+  void checkForRemovedFiles();
+
+  void checkForNewFiles();
+
+  void updateFlowFileAttributes(const std::string &full_file_name, const TailState &state, const std::string &fileName,
+                                const std::string &baseName, const std::string &extension,
+                                std::shared_ptr<FlowFileRecord> &flow_file) const;
+
+  void updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const;
 };
 
 REGISTER_RESOURCE(TailFile, "\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual."
@@ -130,7 +184,7 @@ REGISTER_RESOURCE(TailFile, "\"Tails\" a file, or a list of files, ingesting dat
                   " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover"
                   " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds,"
                   " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
-                  " does not support ingesting files that have been compressed when 'rolled over'.");
+                  " does not support ingesting files that have been compressed when 'rolled over'.")
 
 }  // namespace processors
 }  // namespace minifi
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index bcc228d..f2af687 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -38,7 +38,10 @@ FOREACH(testfile ${PROCESSOR_UNIT_TESTS})
     target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
 	target_wholearchive_library(${testfilename} minifi-standard-processors)
 	target_wholearchive_library(${testfilename} minifi-civet-extensions)
-  	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+	if (NOT DISABLE_ROCKSDB)
+		target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+	endif()
+	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
 
 	MATH(EXPR PROCESSOR_INT_TEST_COUNT "${PROCESSOR_INT_TEST_COUNT}+1")
 ENDFOREACH()
@@ -59,6 +62,9 @@ FOREACH(testfile ${PROCESSOR_INTEGRATION_TESTS})
 	target_link_libraries(${testfilename})
 	target_wholearchive_library(${testfilename} minifi-standard-processors)
 	target_wholearchive_library(${testfilename} minifi-civet-extensions)
+	if (NOT DISABLE_ROCKSDB)
+		target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+	endif()
 
 	MATH(EXPR INT_TEST_COUNT "${INT_TEST_COUNT}+1")
 ENDFOREACH()
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index a255c92..66e84a1 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -47,7 +47,33 @@ static std::string NEWLINE_FILE = ""  // NOLINT
 static const char *TMP_FILE = "minifi-tmpfile.txt";
 static const char *STATE_FILE = "minifi-state-file.txt";
 
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
+namespace {
+std::string createTempFile(const std::string &directory, const std::string &file_name, const std::string &contents,
+    std::ios_base::openmode open_mode = std::ios::out | std::ios::binary) {
+  std::string full_file_name = directory + utils::file::FileUtils::get_separator() + file_name;
+  std::ofstream tmpfile{full_file_name, open_mode};
+  tmpfile << contents;
+  return full_file_name;
+}
+
+void appendTempFile(const std::string &directory, const std::string &file_name, const std::string &contents,
+    std::ios_base::openmode open_mode = std::ios::app | std::ios::binary) {
+  createTempFile(directory, file_name, contents, open_mode);
+}
+
+void removeFile(const std::string &directory, const std::string &file_name) {
+  std::string full_file_name = directory + utils::file::FileUtils::get_separator() + file_name;
+  std::remove(full_file_name.c_str());
+}
+
+void renameTempFile(const std::string &directory, const std::string &old_file_name, const std::string &new_file_name) {
+  std::string old_full_file_name = directory + utils::file::FileUtils::get_separator() + old_file_name;
+  std::string new_full_file_name = directory + utils::file::FileUtils::get_separator() + new_file_name;
+  rename(old_full_file_name.c_str(), new_full_file_name.c_str());
+}
+}  // namespace
+
+TEST_CASE("TailFile reads the file until the first delimiter", "[simple]") {
   // Create and write to the test file
 
   TestController testController;
@@ -78,17 +104,17 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
 
   testController.runSession(plan, false);
   auto records = plan->getProvenanceRecords();
-  REQUIRE(records.size() == 2);
+  REQUIRE(records.size() == 5);  // line 1: CREATE, MODIFY; line 2: CREATE, MODIFY, DROP
 
   testController.runSession(plan, false);
 
   REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
-  REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0"));
+  REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0"));
 
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
+TEST_CASE("TailFile picks up the second line if a delimiter is written between runs", "[state]") {
   // Create and write to the test file
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -118,21 +144,22 @@ TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
 
   plan->reset(true);  // start a new but with state file
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
   std::ofstream appendStream;
   appendStream.open(temp_file.str(), std::ios_base::app | std::ios_base::binary);
   appendStream << std::endl;
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.14-34.txt"));
 
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
+TEST_CASE("TailFile re-reads the file if the state is deleted between runs", "[state]") {
   // Create and write to the test file
 
   TestController testController;
@@ -163,7 +190,7 @@ TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
 
   plan->reset(true);  // start a new but with state file
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
@@ -173,10 +200,10 @@ TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
   testController.runSession(plan, true);
 
   // if we lose state we restart
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
 }
 
-TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
+TEST_CASE("TailFile picks up the state correctly if it is rewritten between runs", "[state]") {
   // Create and write to the test file
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -209,7 +236,7 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   testController.runSession(plan, true);
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt"));
 
   std::string filePath, fileName;
   REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file.str(), filePath, fileName));
@@ -217,6 +244,7 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
   // should stay the same
   for (int i = 0; i < 5; i++) {
     plan->reset(true);  // start a new but with state file
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->set({{"file.0.name", fileName},
                                                                                    {"file.0.position", "14"},
@@ -225,7 +253,7 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
     testController.runSession(plan, true);
 
     // if we lose state we restart
-    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.14-34.txt"));
   }
   for (int i = 14; i < 34; i++) {
     plan->reset(true);  // start a new but with state file
@@ -239,12 +267,11 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
 
   plan->runCurrentProcessor();
   for (int i = 14; i < 34; i++) {
-    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile." + std::to_string(i) + "-34.txt"));
   }
 }
 
-
-TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
+TEST_CASE("TailFile converts the old-style state file to the new-style state", "[state][migration]") {
   // Create and write to the test file
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -262,25 +289,13 @@ TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
   char format[] = "/tmp/gt.XXXXXX";
   auto dir = testController.createTempDirectory(format);
 
-  auto createTempFile = [&](const std::string& file_name) -> std::string /*file_path*/ {
-    std::stringstream temp_file;
-    temp_file << dir << utils::file::FileUtils::get_separator() << file_name;
-
-    std::ofstream tmpfile;
-    tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
-    tmpfile << NEWLINE_FILE << "\n";
-    tmpfile.close();
-
-    return temp_file.str();
-  };
-
   std::stringstream state_file;
   state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
 
   auto statefile = state_file.str() + "." + id;
 
   SECTION("single") {
-    const std::string temp_file = createTempFile(TMP_FILE);
+    const std::string temp_file = createTempFile(dir, TMP_FILE, NEWLINE_FILE + '\n');
 
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file);
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
@@ -300,7 +315,7 @@ TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
     newstatefile.close();
 
     testController.runSession(plan, true);
-    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.14-34.txt"));
 
     std::unordered_map<std::string, std::string> state;
     REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
@@ -309,19 +324,26 @@ TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
     REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file, filePath, fileName));
     std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName},
                                                                 {"file.0.position", "35"},
-                                                                {"file.0.current", temp_file}};
-    REQUIRE(expected_state == state);
+                                                                {"file.0.current", temp_file},
+                                                                {"file.0.checksum", "1404369522"}};
+    for (const auto& key_value_pair : expected_state) {
+      const auto it = state.find(key_value_pair.first);
+      REQUIRE(it != state.end());
+      REQUIRE(it->second == key_value_pair.second);
+    }
+    REQUIRE(state.find("file.0.last_read_time") != state.end());
   }
 
   SECTION("multiple") {
     const std::string file_name_1 = "bar.txt";
     const std::string file_name_2 = "foo.txt";
-    const std::string temp_file_1 = createTempFile(file_name_1);
-    const std::string temp_file_2 = createTempFile(file_name_2);
+    const std::string temp_file_1 = createTempFile(dir, file_name_1, NEWLINE_FILE + '\n');
+    const std::string temp_file_2 = createTempFile(dir, file_name_2, NEWLINE_FILE + '\n');
 
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
-    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*\\.txt");
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
@@ -348,17 +370,114 @@ TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
     std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName1},
                                                                 {"file.0.position", "35"},
                                                                 {"file.0.current", temp_file_1},
+                                                                {"file.0.checksum", "1404369522"},
                                                                 {"file.1.name", fileName2},
                                                                 {"file.1.position", "35"},
-                                                                {"file.1.current", temp_file_2}};
-    REQUIRE(expected_state == state);
+                                                                {"file.1.current", temp_file_2},
+                                                                {"file.1.checksum", "2289158555"}};
+    for (const auto& key_value_pair : expected_state) {
+      const auto it = state.find(key_value_pair.first);
+      REQUIRE(it != state.end());
+      REQUIRE(it->second == key_value_pair.second);
+    }
+    REQUIRE(state.find("file.0.last_read_time") != state.end());
+    REQUIRE(state.find("file.1.last_read_time") != state.end());
   }
 }
 
+TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs", "[state]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
 
-TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
-  // Create and write to the test file
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tail_file = plan->addProcessor("TailFile", "tail_file");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string directory = testController.createTempDirectory(format);
+
+  std::string first_test_file = createTempFile(directory, "first.log", "my first log line\n");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), first_test_file);
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:first.0-17.log"));
+
+  SECTION("The new file gets picked up") {
+    std::string second_test_file = createTempFile(directory, "second.log", "my second log line\n");
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), second_test_file);
+    plan->reset(true);  // clear the memory, but keep the state file
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:second.0-18.log"));
+  }
+
+  SECTION("The old file will no longer be tailed") {
+    appendTempFile(directory, "first.log", "add some more stuff\n");
+    std::string second_test_file = createTempFile(directory, "second.log", "");
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), second_test_file);
+    plan->reset(true);  // clear the memory, but keep the state file
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
+  }
+}
+
+TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs (multiple file mode)", "[state][multiple_file]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string directory = testController.createTempDirectory(format);
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tail_file = plan->addProcessor("TailFile", "tail_file");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), directory);
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "first\\..*\\.log");
+
+  std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  createTempFile(directory, "first.fruit.log", "apple\n");
+  createTempFile(directory, "second.fruit.log", "orange\n");
+  createTempFile(directory, "first.animal.log", "hippopotamus\n");
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.0-5.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:first.animal.0-12.log"));
+
+  appendTempFile(directory, "first.fruit.log", "banana\n");
+  appendTempFile(directory, "first.animal.log", "hedgehog\n");
+
+  SECTION("If a file no longer matches the new regex, then we stop tailing it") {
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "first\\.f.*\\.log");
+    plan->reset(true);  // clear the memory, but keep the state file
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log"));
+  }
+
+  SECTION("If a new file matches the new regex, we start tailing it") {
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*\\.fruit\\.log");
+    plan->reset(true);  // clear the memory, but keep the state file
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 2 flow file"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:second.fruit.0-6.log"));
+  }
+}
 
+TEST_CASE("TailFile finds the single input file in both Single and Multiple mode", "[simple]") {
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
   LogTestController::getInstance().setDebug<core::ProcessSession>();
@@ -366,7 +485,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
-  auto id = tailfile->getUUIDStr();
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "");
 
   plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
 
@@ -388,6 +507,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
   }
 
   testController.runSession(plan, false);
@@ -402,14 +522,91 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
-  std::string line1("foo");
+TEST_CASE("TailFile picks up new files created between runs", "[multiple_file]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfile");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*\\.log");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+  plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  createTempFile(dir, "application.log", "line1\nline2\n");
+
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+
+  createTempFile(dir, "another.log", "some more content\n");
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("TailFile can handle input files getting removed", "[multiple_file]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfile");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*\\.log");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> logattribute = plan->addProcessor("LogAttribute", "logattribute",
+                                                                     core::Relationship("success", "description"),
+                                                                     true);
+  plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  createTempFile(dir, "one.log", "line one\n");
+  createTempFile(dir, "two.log", "some stuff\n");
+
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  appendTempFile(dir, "one.log", "line two\nline three\nline four\n");
+  removeFile(dir, "two.log");
+
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
+
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("TailFile processes a very long line correctly", "[simple]") {
+  std::string line1("foo\n");
   std::string line2(8050, 0);
-  std::mt19937 gen(std::random_device { }());
-  std::generate_n(line2.begin(), line2.size(), [&]() -> char {
+  std::mt19937 gen(std::random_device{}());  // NOLINT (linter wants a space before '{')
+  std::generate_n(line2.begin(), line2.size() - 1, [&]() -> char {
     return 32 + gen() % (127 - 32);
   });
-  std::string line3("bar");
+  line2.back() = '\n';
+  std::string line3("bar\n");
   std::string line4("buzz");
 
   TestController testController;
@@ -428,7 +625,7 @@ TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
   temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
   std::ofstream tmpfile;
   tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
-  tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
+  tmpfile << line1 << line2 << line3 << line4;
   tmpfile.close();
 
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
@@ -476,15 +673,15 @@ TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
+TEST_CASE("TailFile processes a long line followed by multiple newlines correctly", "[simple][edge_case]") {
   // Test having two delimiters on the buffer boundary
-  std::string line1(4097, '\n');
+  std::string line1(4098, '\n');
   std::mt19937 gen(std::random_device { }());
   std::generate_n(line1.begin(), 4095, [&]() -> char {
   return 32 + gen() % (127 - 32);
   });
-  std::string line2("foo");
-  std::string line3("bar");
+  std::string line2("foo\n");
+  std::string line3("bar\n");
   std::string line4("buzz");
 
   // Create and write to the test file
@@ -506,7 +703,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
   temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
   std::ofstream tmpfile;
   tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
-  tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
+  tmpfile << line1 << line2 << line3 << line4;
   tmpfile.close();
 
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
@@ -520,7 +717,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
   testController.runSession(plan, true);
 
   REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files"));
-  auto line1_hex = utils::StringUtils::to_hex(line1.substr(0, 4095));
+  auto line1_hex = utils::StringUtils::to_hex(line1.substr(0, 4096));
   std::stringstream line1_hex_lines;
   for (size_t i = 0; i < line1_hex.size(); i += 80) {
     line1_hex_lines << line1_hex.substr(i, 80) << '\n';
@@ -533,32 +730,77 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
   LogTestController::getInstance().reset();
 }
 
-TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
+TEST_CASE("TailFile onSchedule throws if file(s) to tail cannot be determined", "[configuration]") {
   TestController testController;
-  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+  LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
-  auto id = tailfile->getUUIDStr();
 
-  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+  SECTION("Single file mode by default") {
+    SECTION("No FileName") {
+    }
 
-  char format[] = "/tmp/gt.XXXXXX";
-  auto dir = testController.createTempDirectory(format);
+    SECTION("FileName does not contain the path") {
+      plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-log.txt");
+    }
+  }
 
-  SECTION("No File and No base") {
-    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  SECTION("Explicit Single file mode") {
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Single file");
+
+    SECTION("No FileName") {
+    }
+
+    SECTION("FileName does not contain the path") {
+      plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-log.txt");
+    }
   }
 
-  SECTION("No base") {
-    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+  SECTION("Multiple file mode") {
     plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+
+    SECTION("No FileName and no BaseDirectory") {
+    }
+
+    SECTION("No BaseDirectory") {
+      plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+    }
   }
 
   REQUIRE_THROWS(plan->runNextProcessor());
 }
 
-TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
+TEST_CASE("TailFile onSchedule throws in Multiple mode if the Base Directory does not exist", "[configuration][multiple_file]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<minifi::processors::TailFile>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  plan->setProperty(tailfile, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tailfile, processors::TailFile::FileName.getName(), ".*\\.log");
+
+  SECTION("No Base Directory is set") {
+    REQUIRE_THROWS(plan->runNextProcessor());
+  }
+
+  SECTION("Base Directory is set, but does not exist") {
+    std::string nonexistent_file_name{"/no-such-directory/688b01d0-9e5f-11ea-820d-f338c34d39a1/31d1a81a-9e5f-11ea-a77b-8b27d514a452"};
+    plan->setProperty(tailfile, processors::TailFile::BaseDirectory.getName(), nonexistent_file_name);
+    REQUIRE_THROWS(plan->runNextProcessor());
+  }
+
+  SECTION("Base Directory is set and it exists") {
+    char format[] = "/tmp/gt.XXXXXX";
+    std::string directory = testController.createTempDirectory(format);
+
+    plan->setProperty(tailfile, processors::TailFile::BaseDirectory.getName(), directory);
+    plan->setProperty(tailfile, processors::TailFile::LookupFrequency.getName(), "0 sec");
+    REQUIRE_NOTHROW(plan->runNextProcessor());
+  }
+}
+
+TEST_CASE("TailFile finds and finishes the renamed file and continues with the new log file", "[rotation]") {
   TestController testController;
 
   const char DELIM = ',';
@@ -573,16 +815,9 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
   char format[] = "/tmp/gt.XXXXXX";
   auto dir = testController.createTempDirectory(format);
 
-  // Define test input file
-  std::string in_file(dir);
-#ifndef WIN32
-  in_file.append("/");
-#else
-  in_file.append("\\");
-#endif
-  in_file.append("testfifo.txt");
+  std::string in_file = dir + utils::file::FileUtils::get_separator() + "testfifo.txt";
 
-  std::ofstream in_file_stream(in_file);
+  std::ofstream in_file_stream(in_file, std::ios::out | std::ios::binary);
   in_file_stream << NEWLINE_FILE;
   in_file_stream.flush();
 
@@ -591,14 +826,13 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
 
   SECTION("single") {
-    plan->setProperty(
-        tail_file,
-        processors::TailFile::FileName.getName(), in_file);
+    plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
   }
   SECTION("Multiple") {
-    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "testfifo.txt");
     plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
     plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::LookupFrequency.getName(), "0 sec");
   }
 
   auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
@@ -611,6 +845,8 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
 
   REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(expected_pieces) + " flow files"));
 
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));  // make sure the new file gets newer modification time
+
   in_file_stream << DELIM;
   in_file_stream.close();
 
@@ -618,28 +854,24 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
 
   REQUIRE(rename(in_file.c_str(), rotated_file.c_str()) == 0);
 
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make sure the new file gets newer modification time
-
-  std::ofstream new_in_file_stream(in_file);
+  std::ofstream new_in_file_stream(in_file, std::ios::out | std::ios::binary);
   new_in_file_stream << "five" << DELIM << "six" << DELIM;
   new_in_file_stream.close();
 
   plan->reset();
-  plan->runNextProcessor();  // Tail
-  plan->runNextProcessor();  // Log
-
-  // Find the last flow file in the rotated file
-  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  plan->reset();
   plan->runNextProcessor();  // Tail
   plan->runNextProcessor();  // Log
 
-  // Two new files in the new flow file
-  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  // Find the last flow file in the rotated file, and then pick up the new file
+  REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:testfifo.txt.28-34.1"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:testfifo.0-4.txt"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:testfifo.5-8.txt"));
 }
 
-TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
+TEST_CASE("TailFile finds and finishes multiple rotated files and continues with the new log file", "[rotation]") {
   TestController testController;
 
   const char DELIM = ':';
@@ -652,46 +884,660 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
   char format[] = "/tmp/gt.XXXXXX";
   auto dir = testController.createTempDirectory(format);
 
-  // Define test input file
-  std::string in_file(dir);
-  in_file.append("fruits.txt");
+  std::string test_file = dir + utils::file::FileUtils::get_separator() + "fruits.log";
 
-  for (int i = 2; 0 <= i; --i) {
-    if (i < 2) {
-      std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make sure the new file gets newer modification time
-    }
-    std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
-    for (int j = 0; j <= i; j++) {
-      in_file_stream << "Apple" << DELIM;
-    }
-    in_file_stream.close();
-  }
+  std::ofstream test_file_stream_0(test_file, std::ios::binary);
+  test_file_stream_0 << "Apple" << DELIM << "Orange" << DELIM;
+  test_file_stream_0.flush();
 
   // Build MiNiFi processing graph
   auto tail_file = plan->addProcessor("TailFile", "Tail");
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
-  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), test_file);
   auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
-  // Log as many FFs as it can to make sure exactly the expected amount is produced
 
-  // Each iteration should go through one file and log all flowfiles
-  for (int i = 2; 0 <= i; --i) {
-    plan->reset();
-    plan->runNextProcessor();  // Tail
-    plan->runNextProcessor();  // Log
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.0-5.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.6-12.log"));
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  test_file_stream_0 << "Pear" << DELIM;
+  test_file_stream_0.close();
+
+  std::string first_rotated_file = dir + utils::file::FileUtils::get_separator() + "fruits.0.log";
+  REQUIRE(rename(test_file.c_str(), first_rotated_file.c_str()) == 0);
+
+  std::ofstream test_file_stream_1(test_file, std::ios::binary);
+  test_file_stream_1 << "Pineapple" << DELIM << "Kiwi" << DELIM;
+  test_file_stream_1.close();
+
+  std::string second_rotated_file = dir + utils::file::FileUtils::get_separator() + "fruits.1.log";
+  REQUIRE(rename(test_file.c_str(), second_rotated_file.c_str()) == 0);
+
+  std::ofstream test_file_stream_2(test_file, std::ios::binary);
+  test_file_stream_2 << "Apricot" << DELIM;
+  test_file_stream_2.close();
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 4 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.0.13-17.log"));   // Pear
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.1.0-9.log"));     // Pineapple
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.1.10-14.log"));   // Kiwi
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruits.0-7.log"));       // Apricot
+}
+
+TEST_CASE("TailFile ignores old rotated files", "[rotation]") {
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  const std::string dir = testController.createTempDirectory(format);
+
+  std::string log_file_name = dir + utils::file::FileUtils::get_separator() + "test.log";
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfile");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), log_file_name);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  std::shared_ptr<core::Processor> logattribute = plan->addProcessor("LogAttribute", "logattribute",
+                                                                     core::Relationship("success", "description"),
+                                                                     true);
+  plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  createTempFile(dir, "test.2019-08-20", "line1\nline2\nline3\nline4\n");   // very old rotated file
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+
+  createTempFile(dir, "test.log", "line5\nline6\nline7\n");
+
+  testController.runSession(plan, true);
+  REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
+  REQUIRE(!LogTestController::getInstance().contains("key:filename value:test.2019-08-20"));
+
+  std::string rotated_log_file_name = dir + utils::file::FileUtils::get_separator() + "test.2020-05-18";
+  REQUIRE(rename(log_file_name.c_str(), rotated_log_file_name.c_str()) == 0);
+
+  createTempFile(dir, "test.log", "line8\nline9\n");
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+  REQUIRE(!LogTestController::getInstance().contains("key:filename value:test.2019-08-20"));
+
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("TailFile rotation works with multiple input files", "[rotation][multiple_file]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  createTempFile(dir, "fruit.log", "apple\npear\nbanana\n");
+  createTempFile(dir, "animal.log", "bear\ngiraffe\n");
+  createTempFile(dir, "color.log", "red\nblue\nyellow\npurple\n");
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), ".*\\.log");
+  plan->setProperty(tail_file, processors::TailFile::BaseDirectory.getName(), dir);
+  plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+
+  auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged " + std::to_string(3 + 2 + 4) + " flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.0-5.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.6-10.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.11-17.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.0-4.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.5-12.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.0-3.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.4-8.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.9-15.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.16-22.log"));
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  appendTempFile(dir, "fruit.log", "orange\n");
+  appendTempFile(dir, "animal.log", "axolotl\n");
+  appendTempFile(dir, "color.log", "aquamarine\n");
+
+  renameTempFile(dir, "fruit.log", "fruit.0");
+  renameTempFile(dir, "animal.log", "animal.0");
+
+  createTempFile(dir, "fruit.log", "peach\n");
+  createTempFile(dir, "animal.log", "dinosaur\n");
+  appendTempFile(dir, "color.log", "turquoise\n");
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 6 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.18-24.0"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.0-5.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.13-20.0"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.0-8.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.23-33.log"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:color.34-43.log"));
+}
+
+TEST_CASE("TailFile handles the Rolling Filename Pattern property correctly", "[rotation]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  std::string test_file = createTempFile(dir, "test.log", "some stuff\n");
+
+  // Build MiNiFi processing graph
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), test_file);
+
+  std::vector<std::string> expected_log_lines;
+
+  SECTION("If no pattern is set, we use the default, which is ${filename}.*, so the unrelated file will be picked up") {
+    expected_log_lines = std::vector<std::string>{"Logged 2 flow files",
+                                                  "test.rolled.11-24.log",
+                                                  "test.0-15.txt"};
+  }
+
+  SECTION("If a pattern is set to exclude the unrelated file, we no longer pick it up") {
+    plan->setProperty(tail_file, processors::TailFile::RollingFilenamePattern.getName(), "${filename}.*.log");
 
-    REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(i + 1) + " flow files"));
+    expected_log_lines = std::vector<std::string>{"Logged 1 flow file",
+                                                  "test.rolled.11-24.log"};
   }
 
-  // Rrite some more data to the source file
-  std::ofstream in_file_stream(in_file);
-  in_file_stream << "Pear" << DELIM << "Cherry" << DELIM;
+  SECTION("We can also set the pattern to not include the file name") {
+    plan->setProperty(tail_file, processors::TailFile::RollingFilenamePattern.getName(), "other_roll??.log");
+
+    expected_log_lines = std::vector<std::string>{"Logged 1 flow file",
+                                                  "other_rolled.11-24.log"};
+  }
+
+  auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-10.log"));
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  appendTempFile(dir, "test.log", "one more line\n");
+  renameTempFile(dir, "test.log", "test.rolled.log");
+  createTempFile(dir, "test.txt", "unrelated stuff\n");
+  createTempFile(dir, "other_rolled.log", "some stuff\none more line\n");  // same contents as test.rolled.log
 
   plan->reset();
-  plan->runNextProcessor();  // Tail
-  plan->runNextProcessor();  // Log
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  for (const auto &log_line : expected_log_lines) {
+    REQUIRE(LogTestController::getInstance().contains(log_line));
+  }
+}
+
+TEST_CASE("TailFile finds and finishes the renamed file and continues with the new log file after a restart", "[rotation][restart]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char log_dir_format[] = "/tmp/gt.XXXXXX";
+  auto log_dir = testController.createTempDirectory(log_dir_format);
+
+  std::string test_file_1 = createTempFile(log_dir, "test.1", "line one\nline two\nline three\n");  // old rotated file
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+  std::string test_file = createTempFile(log_dir, "test.log", "line four\nline five\nline six\n");  // current log file
 
-  REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
+  char state_dir_format[] = "/tmp/gt.XXXXXX";
+  auto state_dir = testController.createTempDirectory(state_dir_format);
+
+  utils::Identifier tail_file_uuid = utils::IdGenerator::getIdGenerator()->generate();
+  const core::Relationship success_relationship{"success", "everything is fine"};
+
+  {
+    auto test_plan = testController.createPlan(nullptr, state_dir.c_str());
+    auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
+    test_plan->setProperty(tail_file, processors::TailFile::FileName.getName(), test_file);
+    auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
+    test_plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+    test_plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+
+    testController.runSession(test_plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
+  }
+
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  appendTempFile(log_dir, "test.log", "line seven\n");
+  renameTempFile(log_dir, "test.1", "test.2");
+  renameTempFile(log_dir, "test.log", "test.1");
+  createTempFile(log_dir, "test.log", "line eight is the last line\n");
+
+  {
+    auto test_plan = testController.createPlan(nullptr, state_dir.c_str());
+    auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
+    test_plan->setProperty(tail_file, processors::TailFile::FileName.getName(), test_file);
+    auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
+    test_plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+    test_plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+
+    testController.runSession(test_plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:test.29-39.1"));
+    REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-27.log"));
+  }
 }
 
+TEST_CASE("TailFile yields if no work is done", "[yield]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto temp_directory = testController.createTempDirectory(format);
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), ".*\\.log");
+  plan->setProperty(tail_file, processors::TailFile::BaseDirectory.getName(), temp_directory);
+  plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+
+  SECTION("Empty log file => yield") {
+    createTempFile(temp_directory, "first.log", "");
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() > 0);
+
+    SECTION("No logging happened between onTrigger calls => yield") {
+      plan->reset();
+      tail_file->clearYield();
+
+      testController.runSession(plan, true);
+
+      REQUIRE(tail_file->getYieldTime() > 0);
+    }
+
+    SECTION("Some logging happened between onTrigger calls => don't yield") {
+      plan->reset();
+      tail_file->clearYield();
+
+      appendTempFile(temp_directory, "first.log", "stuff stuff\nand stuff\n");
+
+      testController.runSession(plan, true);
+
+      REQUIRE(tail_file->getYieldTime() == 0);
+    }
+  }
+
+  SECTION("Non-empty log file => don't yield") {
+    createTempFile(temp_directory, "second.log", "some content\n");
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() == 0);
+
+    SECTION("No logging happened between onTrigger calls => yield") {
+      plan->reset();
+      tail_file->clearYield();
+
+      testController.runSession(plan, true);
+
+      REQUIRE(tail_file->getYieldTime() > 0);
+    }
+
+    SECTION("Some logging happened between onTrigger calls => don't yield") {
+      plan->reset();
+      tail_file->clearYield();
+
+      appendTempFile(temp_directory, "second.log", "stuff stuff\nand stuff\n");
+
+      testController.runSession(plan, true);
+
+      REQUIRE(tail_file->getYieldTime() == 0);
+    }
+  }
+}
+
+TEST_CASE("TailFile yields if no work is done on any files", "[yield][multiple_file]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto temp_directory = testController.createTempDirectory(format);
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), ".*\\.log");
+  plan->setProperty(tail_file, processors::TailFile::BaseDirectory.getName(), temp_directory);
+  plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+
+  createTempFile(temp_directory, "first.log", "stuff\n");
+  createTempFile(temp_directory, "second.log", "different stuff\n");
+  createTempFile(temp_directory, "third.log", "stuff stuff\n");
+
+  testController.runSession(plan, true);
+  plan->reset();
+  tail_file->clearYield();
+
+  SECTION("No file changed => yield") {
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() > 0);
+  }
+
+  SECTION("One file changed => don't yield") {
+    SECTION("first") { appendTempFile(temp_directory, "first.log", "more stuff\n"); }
+    SECTION("second") { appendTempFile(temp_directory, "second.log", "more stuff\n"); }
+    SECTION("third") { appendTempFile(temp_directory, "third.log", "more stuff\n"); }
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() == 0);
+  }
+
+  SECTION("More than one file changed => don't yield") {
+    SECTION("first and third") {
+      appendTempFile(temp_directory, "first.log", "more stuff\n");
+      appendTempFile(temp_directory, "third.log", "more stuff\n");
+    }
+    SECTION("all of them") {
+      appendTempFile(temp_directory, "first.log", "more stuff\n");
+      appendTempFile(temp_directory, "second.log", "more stuff\n");
+      appendTempFile(temp_directory, "third.log", "more stuff\n");
+    }
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() == 0);
+  }
+}
+
+TEST_CASE("TailFile doesn't yield if work was done on rotated files only", "[yield][rotation]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto temp_directory = testController.createTempDirectory(format);
+  std::string full_file_name = createTempFile(temp_directory, "test.log", "stuff\n");
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), "\n");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), full_file_name);
+
+  testController.runSession(plan, true);
+
+  plan->reset();
+  tail_file->clearYield();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  SECTION("File rotated but not written => yield") {
+    renameTempFile(temp_directory, "test.log", "test.1");
+
+    SECTION("Don't create empty new log file") {
+    }
+    SECTION("Create empty new log file") {
+      createTempFile(temp_directory, "test.log", "");
+    }
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() > 0);
+  }
+
+  SECTION("File rotated and new stuff is added => don't yield") {
+    SECTION("New content before rotation") {
+      appendTempFile(temp_directory, "test.log", "more stuff\n");
+    }
+
+    renameTempFile(temp_directory, "test.log", "test.1");
+
+    SECTION("New content after rotation") {
+      createTempFile(temp_directory, "test.log", "even more stuff\n");
+    }
+
+    testController.runSession(plan, true);
+
+    REQUIRE(tail_file->getYieldTime() == 0);
+  }
+}
+
+TEST_CASE("TailFile handles the Delimiter setting correctly", "[delimiter]") {
+  std::vector<std::pair<std::string, std::string>> test_cases = {
+      // first = value of Delimiter in the config
+      // second = the expected delimiter char which will be used
+      {"", ""}, {",", ","}, {"\t", "\t"}, {"\\t", "\t"}, {"\n", "\n"}, {"\\n", "\n"}, {"\\", "\\"}, {"\\\\", "\\"}};
+  for (const auto &test_case : test_cases) {
+    TestController testController;
+
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setTrace<processors::TailFile>();
+    LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+    char format[] = "/tmp/gt.XXXXXX";
+    auto temp_directory = testController.createTempDirectory(format);
+
+    std::string delimiter = test_case.second;
+    std::string full_file_name = createTempFile(temp_directory, "test.log", "one" + delimiter + "two" + delimiter);
+
+    auto plan = testController.createPlan();
+
+    auto tail_file = plan->addProcessor("TailFile", "Tail");
+    plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), test_case.first);
+    plan->setProperty(tail_file, processors::TailFile::FileName.getName(), full_file_name);
+
+    auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+    plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+    testController.runSession(plan, true);
+
+    if (delimiter.empty()) {
+      REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+      REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-5.log"));
+    } else {
+      REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+      REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-3.log"));
+      REQUIRE(LogTestController::getInstance().contains("key:filename value:test.4-7.log"));
+    }
+  }
+}
+
+TEST_CASE("TailFile handles Unix/Windows line endings correctly", "[simple]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  auto temp_directory = testController.createTempDirectory(format);
+  std::string full_file_name = createTempFile(temp_directory, "test.log", "line1\nline two\n", std::ios::out);  // write in text mode
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), full_file_name);
+
+  auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  testController.runSession(plan, true);
+
+#ifdef WIN32
+  std::size_t line_ending_size = 2;
+#else
+  std::size_t line_ending_size = 1;
+#endif
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(5 + line_ending_size) + " Offset:0"));
+  REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(8 + line_ending_size) + " Offset:0"));
+}
+
+TEST_CASE("TailFile can tail all files in a directory recursively", "[multiple]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string base_directory = testController.createTempDirectory(format);
+  std::string directory1 = base_directory + utils::file::FileUtils::get_separator() + "one";
+  utils::file::FileUtils::create_dir(directory1);
+  std::string directory11 = directory1 + utils::file::FileUtils::get_separator() + "one_child";
+  utils::file::FileUtils::create_dir(directory11);
+  std::string directory2 = base_directory + utils::file::FileUtils::get_separator() + "two";
+  utils::file::FileUtils::create_dir(directory2);
+
+  createTempFile(base_directory, "test.orange.log", "orange juice\n");
+  createTempFile(directory1, "test.blue.log", "blue\n");
+  createTempFile(directory1, "test.orange.log", "orange autumn leaves\n");
+  createTempFile(directory11, "test.camel.log", "camel\n");
+  createTempFile(directory2, "test.triangle.log", "triangle\n");
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, processors::TailFile::BaseDirectory.getName(), base_directory);
+  plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), ".*\\.log");
+
+  auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  plan->setProperty(log_attribute, processors::LogAttribute::LogPayload.getName(), "true");
+
+  SECTION("Recursive lookup not set => defaults to false") {
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+  }
+
+  SECTION("Recursive lookup set to false") {
+    plan->setProperty(tail_file, processors::TailFile::RecursiveLookup.getName(), "false");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file"));
+  }
+
+  SECTION("Recursive lookup set to true") {
+    plan->setProperty(tail_file, processors::TailFile::RecursiveLookup.getName(), "true");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files"));
+  }
+}
+
+TEST_CASE("TailFile interprets the lookup frequency property correctly", "[multiple]") {
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string directory = testController.createTempDirectory(format);
+
+  createTempFile(directory, "test.red.log", "cherry\n");
+
+  auto plan = testController.createPlan();
+
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, processors::TailFile::BaseDirectory.getName(), directory);
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), ".*\\.log");
+
+  auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+  testController.runSession(plan, true);
+
+  SECTION("Lookup frequency not set => defaults to 10 minutes") {
+    std::shared_ptr<processors::TailFile> tail_file_processor = std::dynamic_pointer_cast<processors::TailFile>(tail_file);
+    REQUIRE(tail_file_processor);
+    REQUIRE(tail_file_processor->getLookupFrequency() == std::chrono::minutes{10});
+  }
+
+  SECTION("Lookup frequency set to zero => new files are picked up immediately") {
+    plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+
+    plan->reset(true);
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+    createTempFile(directory, "test.blue.log", "sky\n");
+    createTempFile(directory, "test.green.log", "grass\n");
+
+    testController.runSession(plan, true);
+
+    REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  }
+
+  SECTION("Lookup frequency set to 10 ms => new files are only picked up after 10 ms") {
+    plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "10 ms");
+
+    plan->reset(true);
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+    createTempFile(directory, "test.blue.log", "sky\n");
+    createTempFile(directory, "test.green.log", "grass\n");
+
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
+
+    plan->reset(false);
+    LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(11));
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+  }
+}
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index e13227b..b98eb85 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -113,7 +113,7 @@ class ProcessSession : public ReferenceContainer {
   // import from the data source.
   void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0);
   DEPRECATED(/*deprecated in*/ 0.7.0, /*will remove in */ 2.0) void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter); // NOLINT
-  void import(const std::string& source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char inputDelimiter);
+  DEPRECATED(/*deprecated in*/ 0.8.0, /*will remove in */ 2.0) void import(const std::string& source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, uint64_t offset, char inputDelimiter);
 
   /**
    * Exports the data stream to a file
@@ -132,6 +132,8 @@ class ProcessSession : public ReferenceContainer {
   // Restore content previously stashed to a key
   void restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow);
 
+  bool existsFlowFileInRelationship(const Relationship &relationship);
+
 // Prevent default copy constructor and assignment operation
 // Only support pass by reference or pointer
   ProcessSession(const ProcessSession &parent) = delete;
diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h
index 94751ef..cf39f14 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -75,6 +75,15 @@ class Relationship {
   }
 
   Relationship &operator=(const Relationship &other) = default;
+
+  bool operator==(const Relationship &other) const {
+    return name_ == other.name_;
+  }
+
+  bool operator!=(const Relationship &other) const {
+    return !(*this == other);
+  }
+
   // Whether it is a undefined relationship
   bool isRelationshipUndefined() {
     return isRelationshipNameUndefined(name_);
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 56a5d7b..f95ff5e 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -52,6 +52,7 @@ class CRCStream : public BaseStream {
    * it will exceed our lifetime.
    */
   explicit CRCStream(T *child_stream);
+  CRCStream(T *child_stream, uint64_t initial_crc);
 
   CRCStream(CRCStream<T>&&) noexcept;
 
@@ -92,6 +93,8 @@ class CRCStream : public BaseStream {
    */
   int writeData(uint8_t *value, int size) override;
 
+  using BaseStream::write;
+
   /**
    * write 4 bytes to stream
    * @param base_value non encoded value
@@ -136,6 +139,10 @@ class CRCStream : public BaseStream {
    */
   int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE) override;
 
+  const uint64_t getSize() const override { return child_stream_->getSize(); }
+
+  void closeStream() override { child_stream_->closeStream(); }
+
   short initialize() override { // NOLINT
     child_stream_->initialize();
     reset();
@@ -189,6 +196,13 @@ CRCStream<T>::CRCStream(T *child_stream)
 }
 
 template<typename T>
+CRCStream<T>::CRCStream(T *child_stream, uint64_t initial_crc)
+    : crc_(initial_crc),
+      child_stream_(child_stream),
+      disable_encoding_(false) {
+}
+
+template<typename T>
 CRCStream<T>::CRCStream(CRCStream<T> &&move) noexcept
     : crc_(std::move(move.crc_)),
       child_stream_(std::move(move.child_stream_)),
diff --git a/libminifi/include/utils/RegexUtils.h b/libminifi/include/utils/RegexUtils.h
index 13c3890..f0e60fa 100644
--- a/libminifi/include/utils/RegexUtils.h
+++ b/libminifi/include/utils/RegexUtils.h
@@ -52,6 +52,8 @@ class Regex {
   const std::vector<std::string>& getResult() const;
   const std::string& getSuffix() const;
 
+  static bool matchesFullInput(const std::string &regex, const std::string &input);
+
  private:
   std::string pat_;
   std::string suffix_;
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 2536a9b..ef0a083 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -141,6 +141,8 @@ class StringUtils {
 
   static std::string replaceEnvironmentVariables(std::string& original_string);
 
+  static std::string replaceOne(const std::string &input, const std::string &from, const std::string &to);
+
   static std::string& replaceAll(std::string& source_string, const std::string &from_string, const std::string &to_string);
 
   inline static bool endsWithIgnoreCase(const std::string &value, const std::string & endString) {
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index f67539f..ebd2d07 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -257,6 +257,25 @@ class FileUtils {
     return 0;
   }
 
+  static std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds> last_write_time_point(const std::string &path) {
+    return std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>{std::chrono::seconds{last_write_time(path)}};
+  }
+
+  static uint64_t file_size(const std::string &path) {
+#ifdef WIN32
+    struct _stat result;
+    if (_stat(path.c_str(), &result) == 0) {
+      return result.st_size;
+    }
+#else
+    struct stat result;
+    if (stat(path.c_str(), &result) == 0) {
+      return result.st_size;
+    }
+#endif
+    return 0;
+  }
+
   static bool set_last_write_time(const std::string &path, uint64_t write_time) {
 #ifdef WIN32
     struct __utimbuf64 utim;
@@ -702,6 +721,8 @@ class FileUtils {
     return {};
   }
 #endif /* WIN32 */
+
+  static uint64_t computeChecksum(const std::string &file_name, uint64_t up_to_position);
 }; // NOLINT
 
 }  // namespace file
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h
index 965982c..22fafd6 100644
--- a/libminifi/include/utils/file/PathUtils.h
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -45,6 +45,8 @@ bool getFileNameAndPath(const std::string &path, std::string &filePath, std::str
  */
 std::string getFullPath(const std::string& path);
 
+std::string globToRegex(std::string glob);
+
 }  // namespace PathUtils
 }  // namespace file
 }  // namespace utils
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index eb93462..a1636d5 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -217,7 +217,9 @@ void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   } else {
     logger_->log_debug("Flow does not contain content. no resource claim to decrement.");
   }
-  process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
+  if (_addedFlowFiles.find(flow->getUUIDStr()) == _addedFlowFiles.end()) {
+    process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
+  }
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
   std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
@@ -648,7 +650,20 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 }
 
 void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) {
+// this function calls a deprecated function, but it is itself deprecated, so suppress warnings
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
+#elif defined(__GNUC__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+#endif
   import(source, flows, offset, inputDelimiter);
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#elif defined(__GNUC__)
+#pragma GCC diagnostic pop
+#endif
   logger_->log_trace("Closed input %s, keeping source ? %i", source, keepSource);
   if (!keepSource) {
     std::remove(source.c_str());
@@ -972,6 +987,13 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
   return false;
 }
 
+bool ProcessSession::existsFlowFileInRelationship(const Relationship &relationship) {
+  return std::any_of(_transferRelationship.begin(), _transferRelationship.end(),
+      [&relationship](const std::map<std::string, Relationship>::value_type &key_value_pair) {
+        return relationship == key_value_pair.second;
+  });
+}
+
 }  // namespace core
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/src/utils/RegexUtils.cpp b/libminifi/src/utils/RegexUtils.cpp
index 9e88280..bf976e3 100644
--- a/libminifi/src/utils/RegexUtils.cpp
+++ b/libminifi/src/utils/RegexUtils.cpp
@@ -158,6 +158,16 @@ const std::vector<std::string>& Regex::getResult() const { return results_; }
 
 const std::string& Regex::getSuffix() const { return suffix_; }
 
+bool Regex::matchesFullInput(const std::string &regex, const std::string &input) {
+#ifdef NO_MORE_REGFREEE
+  std::regex re{regex};
+  return std::regex_match(input, re);
+#else
+  Regex rgx('^' + regex + '$');
+  return rgx.match(input);
+#endif
+}
+
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index eb15d27..6533422 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -120,6 +120,16 @@ std::string StringUtils::replaceEnvironmentVariables(std::string& original_strin
   return source_string;
 }
 
+std::string StringUtils::replaceOne(const std::string &input, const std::string &from, const std::string &to) {
+  std::size_t found_at_position = input.find(from);
+  if (found_at_position != std::string::npos) {
+    std::string input_copy = input;
+    return input_copy.replace(found_at_position, from.size(), to);
+  } else {
+    return input;
+  }
+}
+
 std::string& StringUtils::replaceAll(std::string& source_string, const std::string &from_string, const std::string &to_string) {
   std::size_t loc = 0;
   std::size_t lastFound;
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/src/utils/file/FileUtils.cpp
similarity index 53%
copy from libminifi/include/utils/file/PathUtils.h
copy to libminifi/src/utils/file/FileUtils.cpp
index 965982c..a010cd7 100644
--- a/libminifi/include/utils/file/PathUtils.h
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -14,10 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
-#define LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
 
-#include <string>
+#include "utils/file/FileUtils.h"
+
+#include <zlib.h>
+
+#include <algorithm>
+#include <iostream>
 
 namespace org {
 namespace apache {
@@ -25,32 +28,30 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 namespace file {
-namespace PathUtils {
 
-/**
- * Extracts the filename and path performing some validation of the path and output to ensure
- * we don't provide invalid results.
- * @param path input path
- * @param filePath output file path
- * @param fileName output file name
- * @return result of the operation.
- */
-bool getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName);
+uint64_t FileUtils::computeChecksum(const std::string &file_name, uint64_t up_to_position) {
+  constexpr uint64_t BUFFER_SIZE = 4096u;
+  std::array<char, std::size_t{BUFFER_SIZE}> buffer;
 
-/**
- * Resolves the supplied path to an absolute pathname using the native OS functions
- * (realpath(3) on *nix, GetFullPathNameA on Windows)
- * @param path the name of the file
- * @return the canonicalized absolute pathname on success, empty string on failure
- */
-std::string getFullPath(const std::string& path);
+  std::ifstream stream{file_name, std::ios::in | std::ios::binary};
+
+  uint64_t checksum = 0;
+  uint64_t remaining_bytes_to_be_read = up_to_position;
+
+  while (stream && remaining_bytes_to_be_read > 0) {
+    // () around std::min are needed because Windows.h defines min (and max) as a macro
+    stream.read(buffer.data(), (std::min)(BUFFER_SIZE, remaining_bytes_to_be_read));
+    uint64_t bytes_read = stream.gcount();
+    checksum = crc32(checksum, reinterpret_cast<unsigned char*>(buffer.data()), bytes_read);
+    remaining_bytes_to_be_read -= bytes_read;
+  }
+
+  return checksum;
+}
 
-}  // namespace PathUtils
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_FILE_PATHUTILS_H_
diff --git a/libminifi/src/utils/file/PathUtils.cpp b/libminifi/src/utils/file/PathUtils.cpp
index 18fb5be..fd1df13 100644
--- a/libminifi/src/utils/file/PathUtils.cpp
+++ b/libminifi/src/utils/file/PathUtils.cpp
@@ -84,10 +84,16 @@ std::string PathUtils::getFullPath(const std::string& path) {
 #endif
 }
 
+std::string PathUtils::globToRegex(std::string glob) {
+  utils::StringUtils::replaceAll(glob, ".", "\\.");
+  utils::StringUtils::replaceAll(glob, "*", ".*");
+  utils::StringUtils::replaceAll(glob, "?", ".");
+  return glob;
+}
+
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 51dbad5..4ddd6c5 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -303,6 +303,11 @@ std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
   return current_flowfile_;
 }
 
+
+std::shared_ptr<core::ProcessContext> TestPlan::getCurrentContext() {
+  return processor_contexts_.at(location);
+}
+
 std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) {
   std::stringstream connection_name;
   std::shared_ptr<core::Processor> last = processor;
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 03d7717..b94d124 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -266,6 +266,8 @@ class TestPlan {
 
   std::shared_ptr<core::FlowFile> getCurrentFlowFile();
 
+  std::shared_ptr<core::ProcessContext> getCurrentContext();
+
   std::shared_ptr<core::Repository> getFlowRepo() {
     return flow_repo_;
   }
diff --git a/libminifi/test/resources/TestTailFile.yml b/libminifi/test/resources/TestTailFile.yml
index e15a600..d82f13b 100644
--- a/libminifi/test/resources/TestTailFile.yml
+++ b/libminifi/test/resources/TestTailFile.yml
@@ -31,6 +31,7 @@ Processors:
       run duration nanos: 0
       auto-terminated relationships list:
       Properties:
+        File to Tail: test.log
         Input Delimiter: \n
     - name: la
       id: 2438e3c8-015a-1000-79ca-83af40ec1995
diff --git a/libminifi/test/resources/TestTailFileCron.yml b/libminifi/test/resources/TestTailFileCron.yml
index fdf63c0..e6ef593 100644
--- a/libminifi/test/resources/TestTailFileCron.yml
+++ b/libminifi/test/resources/TestTailFileCron.yml
@@ -31,6 +31,7 @@ Processors:
       run duration nanos: 0
       auto-terminated relationships list:
       Properties:
+        File to Tail: test.log
         Input Delimiter: \n
     - name: la
       id: 2438e3c8-015a-1000-79ca-83af40ec1995
diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp
index f7fbce6..5ace8e1 100644
--- a/libminifi/test/unit/CRCTests.cpp
+++ b/libminifi/test/unit/CRCTests.cpp
@@ -61,3 +61,40 @@ TEST_CASE("Test CRC5", "[testcrc5]") {
   test.write(number);
   REQUIRE(3753740124 == test.getCRC());
 }
+
+TEST_CASE("CRCStream with initial crc = 0 is the same as without initial crc", "[initial_crc_arg]") {
+  org::apache::nifi::minifi::io::BaseStream base1;
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_noinit(&base1);
+
+  org::apache::nifi::minifi::io::BaseStream base2;
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_initzero(&base2, 0);
+
+  const std::string textString = "The quick brown fox jumps over the lazy dog";
+  std::vector<uint8_t> textVector1(textString.begin(), textString.end());
+  std::vector<uint8_t> textVector2(textString.begin(), textString.end());
+
+  test_noinit.writeData(textVector1, textVector1.size());
+  test_initzero.writeData(textVector2, textVector2.size());
+  REQUIRE(test_noinit.getCRC() == test_initzero.getCRC());
+}
+
+TEST_CASE("CRCStream: one long write is the same as writing in two pieces", "[initial_crc_arg]") {
+  const std::string textString = "The quick brown fox jumps over the lazy dog";
+
+  org::apache::nifi::minifi::io::BaseStream base_full;
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_full(&base_full, 0);
+  std::vector<uint8_t> textVector_full(textString.begin(), textString.end());
+  test_full.writeData(textVector_full, textVector_full.size());
+
+  org::apache::nifi::minifi::io::BaseStream base_piece1;
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece1(&base_piece1, 0);
+  std::vector<uint8_t> textVector_piece1(textString.begin(), textString.begin() + 15);
+  test_piece1.writeData(textVector_piece1, textVector_piece1.size());
+
+  org::apache::nifi::minifi::io::BaseStream base_piece2;
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece2(&base_piece2, test_piece1.getCRC());
+  std::vector<uint8_t> textVector_piece2(textString.begin() + 15, textString.end());
+  test_piece2.writeData(textVector_piece2, textVector_piece2.size());
+
+  REQUIRE(test_full.getCRC() == test_piece2.getCRC());
+}
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 46b1f39..1cd4f82 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -26,6 +26,7 @@
 #include "utils/file/PathUtils.h"
 #include "utils/gsl.h"
 #include "utils/Environment.h"
+#include "utils/TimeUtil.h"
 
 using org::apache::nifi::minifi::utils::file::FileUtils;
 
@@ -178,3 +179,175 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
   REQUIRE(tempDir1 == utils::file::PathUtils::getFullPath(".\\test2\\..\\test1"));
 #endif
 }
+
+TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_write_time][last_write_time_point]") {
+  using namespace std::chrono;
+
+  uint64_t time_before_write = getTimeMillis() / 1000;
+  time_point<system_clock, seconds> time_point_before_write = time_point_cast<seconds>(system_clock::now());
+
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::last_write_time(test_file) == 0);
+  REQUIRE(FileUtils::last_write_time_point(test_file) == (time_point<system_clock, seconds>{}));
+
+  std::ofstream test_file_stream(test_file);
+  test_file_stream << "foo\n";
+  test_file_stream.flush();
+
+  uint64_t time_after_first_write = getTimeMillis() / 1000;
+  time_point<system_clock, seconds> time_point_after_first_write = time_point_cast<seconds>(system_clock::now());
+
+  uint64_t first_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(first_mtime >= time_before_write);
+  REQUIRE(first_mtime <= time_after_first_write);
+
+  time_point<system_clock, seconds> first_mtime_time_point = FileUtils::last_write_time_point(test_file);
+  REQUIRE(first_mtime_time_point >= time_point_before_write);
+  REQUIRE(first_mtime_time_point <= time_point_after_first_write);
+
+  test_file_stream << "bar\n";
+  test_file_stream.flush();
+
+  uint64_t time_after_second_write = getTimeMillis() / 1000;
+  time_point<system_clock, seconds> time_point_after_second_write = time_point_cast<seconds>(system_clock::now());
+
+  uint64_t second_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(second_mtime >= first_mtime);
+  REQUIRE(second_mtime >= time_after_first_write);
+  REQUIRE(second_mtime <= time_after_second_write);
+
+  time_point<system_clock, seconds> second_mtime_time_point = FileUtils::last_write_time_point(test_file);
+  REQUIRE(second_mtime_time_point >= first_mtime_time_point);
+  REQUIRE(second_mtime_time_point >= time_point_after_first_write);
+  REQUIRE(second_mtime_time_point <= time_point_after_second_write);
+
+  test_file_stream.close();
+  uint64_t third_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(third_mtime == second_mtime);
+
+  time_point<system_clock, seconds> third_mtime_time_point = FileUtils::last_write_time_point(test_file);
+  REQUIRE(third_mtime_time_point == second_mtime_time_point);
+}
+
+TEST_CASE("FileUtils::file_size works", "[file_size]") {
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::file_size(test_file) == 0);
+
+  std::ofstream test_file_stream(test_file, std::ios::out | std::ios::binary);
+  test_file_stream << "foo\n";
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::file_size(test_file) == 4);
+
+  test_file_stream << "foobar\n";
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::file_size(test_file) == 11);
+
+  test_file_stream.close();
+
+  REQUIRE(FileUtils::file_size(test_file) == 11);
+}
+
+TEST_CASE("FileUtils::computeChecksum works", "[computeChecksum]") {
+  constexpr uint64_t CHECKSUM_OF_0_BYTES = 0u;
+  constexpr uint64_t CHECKSUM_OF_4_BYTES = 2117232040u;
+  constexpr uint64_t CHECKSUM_OF_11_BYTES = 3461392622u;
+
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+
+  std::ofstream test_file_stream{test_file, std::ios::out | std::ios::binary};
+  test_file_stream << "foo\n";
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 4) == CHECKSUM_OF_4_BYTES);
+
+  test_file_stream << "foobar\n";
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 11) == CHECKSUM_OF_11_BYTES);
+
+  test_file_stream.close();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 4) == CHECKSUM_OF_4_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 11) == CHECKSUM_OF_11_BYTES);
+
+
+  std::string another_file = dir + FileUtils::get_separator() + "another_test.txt";
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+
+  std::ofstream another_file_stream{another_file, std::ios::out | std::ios::binary};
+  another_file_stream << "foo\nfoobar\nbaz\n";   // starts with the same bytes as test_file
+  another_file_stream.close();
+
+  REQUIRE(FileUtils::computeChecksum(another_file, 0) == CHECKSUM_OF_0_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 4) == CHECKSUM_OF_4_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 11) == CHECKSUM_OF_11_BYTES);
+}
+
+TEST_CASE("FileUtils::computeChecksum with large files", "[computeChecksum]") {
+  constexpr uint64_t CHECKSUM_OF_0_BYTES = 0u;
+  constexpr uint64_t CHECKSUM_OF_4095_BYTES = 1902799545u;
+  constexpr uint64_t CHECKSUM_OF_4096_BYTES = 1041266625u;
+  constexpr uint64_t CHECKSUM_OF_4097_BYTES = 1619129554u;
+  constexpr uint64_t CHECKSUM_OF_8192_BYTES = 305726917u;
+
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+
+  std::ofstream test_file_stream{test_file, std::ios::out | std::ios::binary};
+  test_file_stream << std::string(4096, 'x');
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 4095) == CHECKSUM_OF_4095_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 4096) == CHECKSUM_OF_4096_BYTES);
+
+  test_file_stream << 'x';
+  test_file_stream.flush();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 4097) == CHECKSUM_OF_4097_BYTES);
+
+  test_file_stream.close();
+
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 4095) == CHECKSUM_OF_4095_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 4096) == CHECKSUM_OF_4096_BYTES);
+  REQUIRE(FileUtils::computeChecksum(test_file, 4097) == CHECKSUM_OF_4097_BYTES);
+
+
+  std::string another_file = dir + FileUtils::get_separator() + "another_test.txt";
+  REQUIRE(FileUtils::computeChecksum(test_file, 0) == CHECKSUM_OF_0_BYTES);
+
+  std::ofstream another_file_stream{another_file, std::ios::out | std::ios::binary};
+  another_file_stream << std::string(8192, 'x');   // starts with the same bytes as test_file
+  another_file_stream.close();
+
+  REQUIRE(FileUtils::computeChecksum(another_file, 0) == CHECKSUM_OF_0_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 4095) == CHECKSUM_OF_4095_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 4096) == CHECKSUM_OF_4096_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 4097) == CHECKSUM_OF_4097_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 8192) == CHECKSUM_OF_8192_BYTES);
+  REQUIRE(FileUtils::computeChecksum(another_file, 9000) == CHECKSUM_OF_8192_BYTES);
+}
diff --git a/libminifi/test/unit/PathUtilsTests.cpp b/libminifi/test/unit/PathUtilsTests.cpp
new file mode 100644
index 0000000..30cbb78
--- /dev/null
+++ b/libminifi/test/unit/PathUtilsTests.cpp
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <catch.hpp>
+#include "utils/file/PathUtils.h"
+
+using namespace org::apache::nifi::minifi::utils::file;
+
+TEST_CASE("PathUtils::globToRegex works", "[globToRegex]") {
+  REQUIRE(PathUtils::globToRegex("") == "");
+  REQUIRE(PathUtils::globToRegex("NoSpecialChars") == "NoSpecialChars");
+  REQUIRE(PathUtils::globToRegex("ReplaceDot.txt") == "ReplaceDot\\.txt");
+  REQUIRE(PathUtils::globToRegex("Replace.Multiple.Dots...txt") == "Replace\\.Multiple\\.Dots\\.\\.\\.txt");
+  REQUIRE(PathUtils::globToRegex("ReplaceAsterisk.*") == "ReplaceAsterisk\\..*");
+  REQUIRE(PathUtils::globToRegex("Replace*Multiple*Asterisks") == "Replace.*Multiple.*Asterisks");
+  REQUIRE(PathUtils::globToRegex("ReplaceQuestionMark?.txt") == "ReplaceQuestionMark.\\.txt");
+}
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
new file mode 100644
index 0000000..0847a4c
--- /dev/null
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include <catch.hpp>
+#include "core/ProcessSession.h"
+#include "../TestBase.h"
+
+namespace {
+
+class DummyProcessor : public core::Processor {
+  using core::Processor::Processor;
+};
+
+REGISTER_RESOURCE(DummyProcessor, "A processor that does nothing.")
+
+class Fixture {
+ public:
+  Fixture();
+  core::ProcessSession &processSession() { return *process_session_; }
+
+ private:
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> test_plan_;
+  std::shared_ptr<core::Processor> dummy_processor_;
+  std::shared_ptr<core::ProcessContext> context_;
+  std::unique_ptr<core::ProcessSession> process_session_;
+};
+
+Fixture::Fixture() {
+  test_plan_ = test_controller_.createPlan();
+  dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
+  test_plan_->runNextProcessor();  // set the dummy processor as current
+  context_ = test_plan_->getCurrentContext();
+  process_session_ = utils::make_unique<core::ProcessSession>(context_);
+}
+
+const core::Relationship Success{"success", "everything is fine"};
+const core::Relationship Failure{"failure", "something has gone awry"};
+
+}  // namespace
+
+TEST_CASE("ProcessSession::existsFlowFileInRelationship works", "[existsFlowFileInRelationship]") {
+  Fixture fixture;
+  core::ProcessSession &process_session = fixture.processSession();
+
+  REQUIRE_FALSE(process_session.existsFlowFileInRelationship(Failure));
+  REQUIRE_FALSE(process_session.existsFlowFileInRelationship(Success));
+
+  const auto flow_file_1 = process_session.create();
+  process_session.transfer(flow_file_1, Failure);
+
+  REQUIRE(process_session.existsFlowFileInRelationship(Failure));
+  REQUIRE_FALSE(process_session.existsFlowFileInRelationship(Success));
+
+  const auto flow_file_2 = process_session.create();
+  process_session.transfer(flow_file_2, Success);
+
+  REQUIRE(process_session.existsFlowFileInRelationship(Failure));
+  REQUIRE(process_session.existsFlowFileInRelationship(Success));
+}
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index ca69944..7f92bc7 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -23,6 +23,7 @@
 #include <iostream>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <utility>
 #include <vector>
@@ -32,6 +33,7 @@
 #include "FlowController.h"
 #include "properties/Configure.h"
 #include "provenance/Provenance.h"
+
 #if defined(__clang__)
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Woverloaded-virtual"
@@ -49,16 +51,16 @@ class TestRepository : public core::Repository {
       : core::SerializableComponent("repo_name"),
         Repository("repo_name", "./dir", 1000, 100, 0) {
   }
-  // initialize
-  bool initialize() {
+
+  bool initialize(const std::shared_ptr<minifi::Configure> &) override {
     return true;
   }
 
-  void start() {
+  void start() override {
     running_ = true;
   }
 
-  void stop() {
+  void stop() override {
     running_ = false;
   }
 
@@ -66,19 +68,19 @@ class TestRepository : public core::Repository {
     repo_full_ = true;
   }
 
-  // Destructor
-  virtual ~TestRepository() = default;
+  ~TestRepository() override = default;
 
-  virtual bool isNoop() {
+  bool isNoop() override {
     return false;
   }
 
-  bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
-    repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
+  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    repository_results_.emplace(key, std::string{reinterpret_cast<const char*>(buf), bufLen});
     return true;
   }
 
-  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) override {
     for (const auto& item: data) {
       if (!Put(item.first, item.second->getBuffer(), item.second->getSize())) {
         return false;
@@ -87,19 +89,20 @@ class TestRepository : public core::Repository {
     return true;
   }
 
-  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override {
     return Put(key, buffer, bufferSize);
   }
 
-  // Delete
-  bool Delete(std::string key) {
-    repositoryResults.erase(key);
+  bool Delete(std::string key) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    repository_results_.erase(key);
     return true;
   }
-  // Get
-  bool Get(const std::string &key, std::string &value) {
-    auto result = repositoryResults.find(key);
-    if (result != repositoryResults.end()) {
+
+  bool Get(const std::string &key, std::string &value) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    auto result = repository_results_.find(key);
+    if (result != repository_results_.end()) {
       value = result->second;
       return true;
     } else {
@@ -107,60 +110,65 @@ class TestRepository : public core::Repository {
     }
   }
 
-  virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) {
+  bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) override {
     return false;
   }
 
-  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
     max_size = 0;
-    for (auto entry : repositoryResults) {
+    for (const auto &entry : repository_results_) {
+      if (max_size >= store.size()) {
+        break;
+      }
       std::shared_ptr<core::SerializableComponent> eventRead = store.at(max_size);
-
       if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) {
       }
-      if (+max_size >= store.size()) {
-        break;
-      }
+      ++max_size;
     }
     return true;
   }
 
-  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) {
+  bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) override {
     return false;
   }
 
-  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
+  bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) override {
     std::string value;
     Get(store->getUUIDStr(), value);
-    store->DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(value.c_str())), value.size());
+    store->DeSerialize(reinterpret_cast<const uint8_t*>(value.c_str()), value.size());
     return true;
   }
 
-  virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
+  bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) override {
     return false;
   }
 
-  const std::map<std::string, std::string> &getRepoMap() const {
-    return repositoryResults;
+  std::map<std::string, std::string> getRepoMap() const {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    return repository_results_;
   }
 
   void getProvenanceRecord(std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, int maxSize) {
-    for (auto entry : repositoryResults) {
-      if (records.size() >= (uint64_t)maxSize)
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    for (const auto &entry : repository_results_) {
+      if (records.size() >= static_cast<uint64_t>(maxSize))
         break;
       std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = std::make_shared<provenance::ProvenanceEventRecord>();
 
-      if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) {
+      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t*>(entry.second.data()), entry.second.length())) {
         records.push_back(eventRead);
       }
     }
   }
 
-  void run() {
+  void run() override {
     // do nothing
   }
+
  protected:
-  std::map<std::string, std::string> repositoryResults;
+  mutable std::mutex repository_results_mutex_;
+  std::map<std::string, std::string> repository_results_;
 };
 
 class TestFlowRepository : public core::Repository {
@@ -169,27 +177,29 @@ class TestFlowRepository : public core::Repository {
       : core::SerializableComponent("ff"),
         core::Repository("ff", "./dir", 1000, 100, 0) {
   }
-  // initialize
-  bool initialize() {
+
+  bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override {
     return true;
   }
 
-  // Destructor
-  virtual ~TestFlowRepository() = default;
+  ~TestFlowRepository() override = default;
 
-  bool Put(std::string key, uint8_t *buf, int bufLen) {
-    repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
+  bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    repository_results_.emplace(key, std::string{reinterpret_cast<const char*>(buf), bufLen});
     return true;
   }
   // Delete
-  bool Delete(std::string key) {
-    repositoryResults.erase(key);
+  bool Delete(std::string key) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    repository_results_.erase(key);
     return true;
   }
-  // Get
-  bool Get(std::string key, std::string &value) {
-    auto result = repositoryResults.find(key);
-    if (result != repositoryResults.end()) {
+
+  bool Get(const std::string &key, std::string &value) override {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    auto result = repository_results_.find(key);
+    if (result != repository_results_.end()) {
       value = result->second;
       return true;
     } else {
@@ -197,30 +207,34 @@ class TestFlowRepository : public core::Repository {
     }
   }
 
-  const std::map<std::string, std::string> &getRepoMap() const {
-    return repositoryResults;
+  std::map<std::string, std::string> getRepoMap() const {
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    return repository_results_;
   }
 
   void getProvenanceRecord(std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, int maxSize) {
-    for (auto entry : repositoryResults) {
-      if (records.size() >= (uint64_t)maxSize)
+    std::lock_guard<std::mutex> lock{repository_results_mutex_};
+    for (const auto &entry : repository_results_) {
+      if (records.size() >= static_cast<uint64_t>(maxSize))
         break;
       std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = std::make_shared<provenance::ProvenanceEventRecord>();
 
-      if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) {
+      if (eventRead->DeSerialize(reinterpret_cast<const uint8_t*>(entry.second.data()), entry.second.length())) {
         records.push_back(eventRead);
       }
     }
   }
 
-  void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+  void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override {
   }
 
-  void run() {
+  void run() override {
     // do nothing
   }
+
  protected:
-  std::map<std::string, std::string> repositoryResults;
+  mutable std::mutex repository_results_mutex_;
+  std::map<std::string, std::string> repository_results_;
 };
 
 class TestFlowController : public minifi::FlowController {
@@ -229,37 +243,38 @@ class TestFlowController : public minifi::FlowController {
   TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo)
       : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) {
   }
-  ~TestFlowController() = default;
-  void load() {
 
+  ~TestFlowController() override = default;
+
+  void load(const std::shared_ptr<core::ProcessGroup> &root = nullptr, bool reload = false) override {
   }
 
-  int16_t start() {
+  int16_t start() override {
     running_.store(true);
     return 0;
   }
 
-  int16_t stop(bool force, uint64_t timeToWait = 0) {
+  int16_t stop(bool force, uint64_t timeToWait = 0) override {
     running_.store(false);
     return 0;
   }
-  void waitUnload(const uint64_t timeToWaitMs) {
+  void waitUnload(const uint64_t timeToWaitMs) override {
     stop(true);
   }
 
-  int16_t pause() {
+  int16_t pause() override {
     return -1;
   }
 
-  void unload() {
+  void unload() override {
     stop(true);
   }
 
-  void reload(std::string file) {
+  void reload(std::string file) override {
 
   }
 
-  bool isRunning() {
+  bool isRunning() override {
     return true;
   }
 
@@ -278,8 +293,9 @@ class TestFlowController : public minifi::FlowController {
   std::shared_ptr<minifi::Connection> createConnection(std::string name, utils::Identifier &  uuid) {
     return 0;
   }
+
  protected:
-  void initializePaths(const std::string &adjustedFilename) {
+  void initializePaths(const std::string &adjustedFilename) override {
   }
 };
 #if defined(__clang__)
diff --git a/libminifi/test/unit/RegexUtilsTests.cpp b/libminifi/test/unit/RegexUtilsTests.cpp
index c0d8cfc..d38f8b6 100644
--- a/libminifi/test/unit/RegexUtilsTests.cpp
+++ b/libminifi/test/unit/RegexUtilsTests.cpp
@@ -71,3 +71,13 @@ TEST_CASE("TestRegexUtils::check_mode", "[regex4]") {
   Regex r2(rgx1, mode);
   REQUIRE(r2.match(pat));
 }
+
+TEST_CASE("Regex::matchesFullInput works correctly", "[matchesFullInput]") {
+  REQUIRE(Regex::matchesFullInput("", "") == true);
+  REQUIRE(Regex::matchesFullInput("", "input") == false);
+  REQUIRE(Regex::matchesFullInput(".*", "input") == true);
+  REQUIRE(Regex::matchesFullInput("np", "input") == false);
+  REQUIRE(Regex::matchesFullInput(".*np.*", "input") == true);
+  REQUIRE(Regex::matchesFullInput("(in|out)put", "input") == true);
+  REQUIRE(Regex::matchesFullInput("inpu[aeiou]*", "input") == false);
+}
diff --git a/libminifi/test/unit/RelationshipTests.cpp b/libminifi/test/unit/RelationshipTests.cpp
new file mode 100644
index 0000000..a44f00d
--- /dev/null
+++ b/libminifi/test/unit/RelationshipTests.cpp
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include <catch.hpp>
+#include "core/Relationship.h"
+
+using org::apache::nifi::minifi::core::Relationship;
+
+TEST_CASE("Relationship equality works", "[operator==]") {
+  const Relationship undefined;
+  const Relationship success{"success", "everything is fine"};
+  const Relationship failure{"failure", "something has gone awry"};
+
+  REQUIRE(undefined == undefined);
+  REQUIRE(!(undefined != undefined));
+
+  REQUIRE(success == success);
+  REQUIRE(failure == failure);
+  REQUIRE(success != failure);
+  REQUIRE(failure != success);
+
+  REQUIRE(success != undefined);
+  REQUIRE(undefined != success);
+}
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index de43029..f85abc1 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -338,3 +338,30 @@ TEST_CASE("TestStringUtils::testJoinPackNegative", "[test join_pack negative]")
               == "rvalue c string, c string, rval std::string, std::string, char array");
 }
  */
+
+TEST_CASE("StringUtils::replaceOne works correctly", "[replaceOne]") {
+  REQUIRE(utils::StringUtils::replaceOne("", "x", "y") == "");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "a", "_") == "b_nana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "b", "_") == "_anana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "x", "y") == "banana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "") == "bana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "AN") == "bANana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "an", "***") == "b***ana");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "kiwi") == "kiwi");
+  REQUIRE(utils::StringUtils::replaceOne("banana", "banana", "grapefruit") == "grapefruit");
+}
+
+TEST_CASE("StringUtils::replaceAll works correctly", "[replaceAll]") {
+  auto replaceAll = [](std::string input, const std::string &from, const std::string &to) -> std::string {
+    return utils::StringUtils::replaceAll(input, from, to);
+  };
+  REQUIRE(replaceAll("", "x", "y") == "");
+  REQUIRE(replaceAll("banana", "a", "_") == "b_n_n_");
+  REQUIRE(replaceAll("banana", "b", "_") == "_anana");
+  REQUIRE(replaceAll("banana", "x", "y") == "banana");
+  REQUIRE(replaceAll("banana", "an", "") == "ba");
+  REQUIRE(replaceAll("banana", "an", "AN") == "bANANa");
+  REQUIRE(replaceAll("banana", "an", "***") == "b******a");
+  REQUIRE(replaceAll("banana", "banana", "kiwi") == "kiwi");
+  REQUIRE(replaceAll("banana", "banana", "grapefruit") == "grapefruit");
+}