You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/06/13 01:37:02 UTC

[nifi-minifi-cpp] branch master updated (a88fc76 -> dedeeff)

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from a88fc76  MINIFICPP-913: Add configuration item
     new e28deb3  MINIFICPP-911 - Added ListSFTP and FetchSFTP processors
     new 848408d  MINIFICPP-917 Update TensorFlow extension to the latest minifi-cpp CMake best practices
     new dedeeff  MINIFICPP-918 Update TensorFlow processors to use property builder

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |    3 +
 NOTICE                                             |    1 +
 cmake/FindTensorFlow.cmake                         |    1 +
 extensions/sftp/CMakeLists.txt                     |    3 +
 extensions/sftp/SFTPLoader.h                       |    8 +-
 extensions/sftp/client/SFTPClient.cpp              |  173 ++-
 extensions/sftp/client/SFTPClient.h                |   40 +-
 extensions/sftp/processors/FetchSFTP.cpp           |  273 +++++
 extensions/sftp/processors/FetchSFTP.h             |  108 ++
 extensions/sftp/processors/ListSFTP.cpp            | 1173 ++++++++++++++++++++
 extensions/sftp/processors/ListSFTP.h              |  218 ++++
 extensions/sftp/processors/PutSFTP.cpp             |  506 ++-------
 extensions/sftp/processors/PutSFTP.h               |   57 +-
 extensions/sftp/processors/SFTPProcessorBase.cpp   |  475 ++++++++
 .../processors/{PutSFTP.h => SFTPProcessorBase.h}  |  127 +--
 extensions/sftp/tests/CMakeLists.txt               |    2 +-
 extensions/sftp/tests/FetchSFTPTests.cpp           |  425 +++++++
 extensions/sftp/tests/ListSFTPTests.cpp            |  907 +++++++++++++++
 extensions/sftp/tests/ListThenFetchSFTPTests.cpp   |  269 +++++
 extensions/sftp/tests/PutSFTPTests.cpp             |    8 +-
 extensions/sftp/tests/tools/SFTPTestServer.cpp     |    9 +-
 extensions/tensorflow/CMakeLists.txt               |   27 +-
 extensions/tensorflow/TFApplyGraph.cpp             |   24 +-
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  100 +-
 libminifi/include/core/Property.h                  |    2 +-
 libminifi/include/utils/TimeUtil.h                 |   22 +-
 libminifi/include/utils/file/FileUtils.h           |  139 ++-
 libminifi/src/core/ConfigurableComponent.cpp       |   12 +-
 libminifi/test/TestBase.cpp                        |   52 +-
 libminifi/test/TestBase.h                          |    6 +-
 libminifi/test/tensorflow-tests/CMakeLists.txt     |    3 +-
 libminifi/test/unit/FileUtilsTests.cpp             |   22 +
 32 files changed, 4463 insertions(+), 732 deletions(-)
 create mode 100644 extensions/sftp/processors/FetchSFTP.cpp
 create mode 100644 extensions/sftp/processors/FetchSFTP.h
 create mode 100644 extensions/sftp/processors/ListSFTP.cpp
 create mode 100644 extensions/sftp/processors/ListSFTP.h
 create mode 100644 extensions/sftp/processors/SFTPProcessorBase.cpp
 copy extensions/sftp/processors/{PutSFTP.h => SFTPProcessorBase.h} (56%)
 create mode 100644 extensions/sftp/tests/FetchSFTPTests.cpp
 create mode 100644 extensions/sftp/tests/ListSFTPTests.cpp
 create mode 100644 extensions/sftp/tests/ListThenFetchSFTPTests.cpp


[nifi-minifi-cpp] 01/03: MINIFICPP-911 - Added ListSFTP and FetchSFTP processors

Posted by ph...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e28deb3bc16fb3431fdf579dd0a1cac6b64ffbca
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Fri Jun 7 08:07:24 2019 +0200

    MINIFICPP-911 - Added ListSFTP and FetchSFTP processors
    
    This closes #586.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 NOTICE                                             |    1 +
 extensions/sftp/CMakeLists.txt                     |    3 +
 extensions/sftp/SFTPLoader.h                       |    8 +-
 extensions/sftp/client/SFTPClient.cpp              |  173 ++-
 extensions/sftp/client/SFTPClient.h                |   40 +-
 extensions/sftp/processors/FetchSFTP.cpp           |  273 +++++
 extensions/sftp/processors/FetchSFTP.h             |  108 ++
 extensions/sftp/processors/ListSFTP.cpp            | 1173 ++++++++++++++++++++
 extensions/sftp/processors/ListSFTP.h              |  218 ++++
 extensions/sftp/processors/PutSFTP.cpp             |  506 ++-------
 extensions/sftp/processors/PutSFTP.h               |   57 +-
 extensions/sftp/processors/SFTPProcessorBase.cpp   |  475 ++++++++
 .../processors/{PutSFTP.h => SFTPProcessorBase.h}  |  127 +--
 extensions/sftp/tests/CMakeLists.txt               |    2 +-
 extensions/sftp/tests/FetchSFTPTests.cpp           |  425 +++++++
 extensions/sftp/tests/ListSFTPTests.cpp            |  907 +++++++++++++++
 extensions/sftp/tests/ListThenFetchSFTPTests.cpp   |  269 +++++
 extensions/sftp/tests/PutSFTPTests.cpp             |    8 +-
 extensions/sftp/tests/tools/SFTPTestServer.cpp     |    9 +-
 libminifi/include/core/Property.h                  |    2 +-
 libminifi/include/utils/TimeUtil.h                 |   22 +-
 libminifi/include/utils/file/FileUtils.h           |  139 ++-
 libminifi/src/core/ConfigurableComponent.cpp       |   12 +-
 libminifi/test/TestBase.cpp                        |   52 +-
 libminifi/test/TestBase.h                          |    6 +-
 libminifi/test/unit/FileUtilsTests.cpp             |   22 +
 26 files changed, 4371 insertions(+), 666 deletions(-)

diff --git a/NOTICE b/NOTICE
index 8984cd0..252860b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -22,6 +22,7 @@ The derived and original works are listed below:
     JniProcessSession extends and is based on ProcessSession
     JniProcessSessionFactory extends and is based on ProcessSessionFactory
     JniProvenanceReporter extends and is based on ProvenanceReporter
+    SFTPTestServer extends and is based on SSHTestServer
 
 This includes derived works from the cURL (MIT/X-style licensed) project (https://github.com/curl/curl):
 Copyright (c) 1996 - 2019, Daniel Stenberg, <da...@haxx.se>, and many contributors, see the THANKS file.
diff --git a/extensions/sftp/CMakeLists.txt b/extensions/sftp/CMakeLists.txt
index 89837aa..2536ee1 100644
--- a/extensions/sftp/CMakeLists.txt
+++ b/extensions/sftp/CMakeLists.txt
@@ -70,6 +70,9 @@ find_package(ZLIB REQUIRED)
 include_directories(${ZLIB_INCLUDE_DIRS})
 target_link_libraries(minifi-sftp ${ZLIB_LIBRARIES})
 
+# Include RapidJSON
+include_directories(thirdparty/rapidjson-1.1.0/include)
+
 if (WIN32)
 message("${OPENSSL_LIBRARIES}")
 	set (WIN32_ARCHIVES "")
diff --git a/extensions/sftp/SFTPLoader.h b/extensions/sftp/SFTPLoader.h
index dd36e24..0ab0286 100644
--- a/extensions/sftp/SFTPLoader.h
+++ b/extensions/sftp/SFTPLoader.h
@@ -18,8 +18,10 @@
 #ifndef EXTENSION_SFTPLOADER_H
 #define EXTENSION_SFTPLOADER_H
 
-#include "processors/PutSFTP.h"
 #include "core/ClassLoader.h"
+#include "processors/PutSFTP.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
 
 class SFTPFactoryInitializer : public core::ObjectFactoryInitializer {
  public:
@@ -57,6 +59,10 @@ class SFTPFactory : public core::ObjectFactory {
   virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
     if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSFTP")) {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PutSFTP>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "FetchSFTP")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::FetchSFTP>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListSFTP")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ListSFTP>());
     } else {
       return nullptr;
     }
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 6d4254b..bc27d5a 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -65,8 +65,77 @@ static const char* sftp_strerror(unsigned long err) {
   }
 }
 
+static SFTPError libssh2_sftp_error_to_sftp_error(unsigned long libssh2_sftp_error) {
+  switch (libssh2_sftp_error) {
+    case LIBSSH2_FX_OK:
+      return SFTPError::SFTP_ERROR_OK;
+    case LIBSSH2_FX_NO_SUCH_FILE:
+    case LIBSSH2_FX_NO_SUCH_PATH:
+      return SFTPError::SFTP_ERROR_FILE_NOT_EXISTS;
+    case LIBSSH2_FX_FILE_ALREADY_EXISTS:
+      return SFTPError::SFTP_ERROR_FILE_ALREADY_EXISTS;
+    case LIBSSH2_FX_PERMISSION_DENIED:
+    case LIBSSH2_FX_WRITE_PROTECT:
+    case LIBSSH2_FX_LOCK_CONFLICT:
+      return SFTPError::SFTP_ERROR_PERMISSION_DENIED;
+    case LIBSSH2_FX_NO_CONNECTION:
+    case LIBSSH2_FX_CONNECTION_LOST:
+      return SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE;
+    case LIBSSH2_FX_EOF:
+    case LIBSSH2_FX_FAILURE:
+    case LIBSSH2_FX_BAD_MESSAGE:
+    case LIBSSH2_FX_OP_UNSUPPORTED:
+    case LIBSSH2_FX_INVALID_HANDLE:
+    case LIBSSH2_FX_NO_MEDIA:
+    case LIBSSH2_FX_NO_SPACE_ON_FILESYSTEM:
+    case LIBSSH2_FX_QUOTA_EXCEEDED:
+    case LIBSSH2_FX_UNKNOWN_PRINCIPAL:
+    case LIBSSH2_FX_DIR_NOT_EMPTY:
+    case LIBSSH2_FX_NOT_A_DIRECTORY:
+    case LIBSSH2_FX_INVALID_FILENAME:
+    case LIBSSH2_FX_LINK_LOOP:
+    default:
+      return SFTPError::SFTP_ERROR_UNEXPECTED;
+  }
+}
+
 constexpr size_t SFTPClient::MAX_BUFFER_SIZE;
 
+LastSFTPError::LastSFTPError()
+    : sftp_error_set_(false)
+    , libssh2_sftp_error_(LIBSSH2_FX_OK)
+    , sftp_error_(SFTPError::SFTP_ERROR_OK) {
+}
+
+LastSFTPError& LastSFTPError::operator=(unsigned long libssh2_sftp_error) {
+  sftp_error_set_ = false;
+  libssh2_sftp_error_ = libssh2_sftp_error;
+  return *this;
+}
+
+LastSFTPError& LastSFTPError::operator=(const SFTPError& sftp_error) {
+  sftp_error_set_ = true;
+  sftp_error_ = sftp_error;
+  return *this;
+}
+
+LastSFTPError::operator unsigned long() const {
+  if (sftp_error_set_) {
+    return LIBSSH2_FX_OK;
+  } else {
+    return libssh2_sftp_error_;
+  }
+}
+
+LastSFTPError::operator SFTPError() const {
+  if (sftp_error_set_) {
+    return sftp_error_;
+  } else {
+    return libssh2_sftp_error_to_sftp_error(libssh2_sftp_error_);
+  }
+}
+
+
 SFTPClient::SFTPClient(const std::string &hostname, uint16_t port, const std::string& username)
     : logger_(logging::LoggerFactory<SFTPClient>::getLogger()),
       hostname_(hostname),
@@ -82,7 +151,8 @@ SFTPClient::SFTPClient(const std::string &hostname, uint16_t port, const std::st
       easy_(nullptr),
       ssh_session_(nullptr),
       sftp_session_(nullptr),
-      connected_(false) {
+      connected_(false),
+      last_error_() {
   easy_ = curl_easy_init();
   if (easy_ == nullptr) {
     throw std::runtime_error("Cannot create curl easy handle");
@@ -403,10 +473,39 @@ bool SFTPClient::sendKeepAliveIfNeeded(int &seconds_to_next) {
   return true;
 }
 
+SFTPError SFTPClient::getLastError() const {
+  return last_error_;
+}
+
 bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_t expected_size /*= -1*/) {
-  LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0);
+  /**
+   * SFTP servers should not set the mode of an existing file on open
+   * (see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-13, Page 33
+   * "The 'attrs' field is ignored if an existing file is opened."
+   * Unfortunately this is a later SFTP version specification than implemented by most servers.)
+   * But because this is the intuitively correct behaviour (especially when opening a file for read only),
+   * most servers (OpenSSH for example) implement it this way.
+   * mina-sshd, the server we use for testing, however did not until recently,
+   * causing all files we read to be set to 0000.
+   * The fix to make it behave correctly has been merged back to master, but not yet released:
+   * https://github.com/apache/mina-sshd/commit/19adb39e4706929b6e5a1b2df056a2b2a29fac4d
+   * If we encounter real servers that behave like this, a workaround would be to stat before opening the file
+   * and "re-setting" the mode we read earlier on open.
+   * An another option would be to patch libssh2 to not send permissions in attrs when opening a file for read only.
+   */
+  LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0 /*mode*/);
   if (file_handle == nullptr) {
-    logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    int ssh_errno = libssh2_session_last_errno(ssh_session_);
+    /* We can only get the sftp error in this case if the ssh error is a protocol error */
+    if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+    } else {
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      char *err_msg = nullptr;
+      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+    }
     return false;
   }
   utils::ScopeGuard guard([&file_handle]() {
@@ -419,7 +518,8 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
   do {
     ssize_t read_ret = libssh2_sftp_read(file_handle, reinterpret_cast<char*>(buf.data()), buf.size());
     if (read_ret < 0) {
-      logger_->log_error("Failed to read remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      logger_->log_error("Failed to read remote file \"%s\"", path.c_str());
       return false;
     } else if (read_ret == 0) {
       logger_->log_trace("EOF while reading remote file \"%s\"", path.c_str());
@@ -429,8 +529,9 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
     total_read += read_ret;
     int remaining = read_ret;
     while (remaining > 0) {
-      int write_ret = output.writeData(buf.data() + (buf.size() - remaining), remaining);
+      int write_ret = output.writeData(buf.data() + (read_ret - remaining), remaining);
       if (write_ret < 0) {
+        last_error_ = LIBSSH2_FX_OK;
         logger_->log_error("Failed to write output");
         return false;
       }
@@ -439,6 +540,7 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
   } while (true);
 
   if (expected_size >= 0 && total_read != expected_size) {
+    last_error_ = LIBSSH2_FX_OK;
     logger_->log_error("Remote file \"%s\" has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
   }
@@ -451,8 +553,17 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
   logger_->log_trace("Opening remote file \"%s\"", path.c_str());
   LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), flags, 0644);
   if (file_handle == nullptr) {
-    logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
-    return false;
+    int ssh_errno = libssh2_session_last_errno(ssh_session_);
+    /* We can only get the sftp error in this case if the ssh error is a protocol error */
+    if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+    } else {
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      char *err_msg = nullptr;
+      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+    }
   }
   utils::ScopeGuard guard([this, &file_handle, &path]() {
     logger_->log_trace("Closing remote file \"%s\"", path.c_str());
@@ -470,8 +581,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
   do {
     int read_ret = input.readData(buf.data(), buf.size());
     if (read_ret < 0) {
-      char *err_msg = nullptr;
-      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      last_error_ = LIBSSH2_FX_OK;
       logger_->log_error("Error while reading input");
       return false;
     } else if (read_ret == 0) {
@@ -484,7 +594,8 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     while (remaining > 0) {
       int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
-        logger_->log_error("Failed to write remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+        last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+        logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
         return false;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
@@ -493,6 +604,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
   } while (true);
 
   if (expected_size >= 0 && total_read != expected_size) {
+    last_error_ = LIBSSH2_FX_OK;
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
   }
@@ -524,10 +636,11 @@ bool SFTPClient::rename(const std::string& source_path, const std::string& targe
       }
       continue;
     }
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
     logger_->log_error("Failed to rename remote file \"%s\" to \"%s\", error: %s",
         source_path.c_str(),
         target_path.c_str(),
-        sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+        sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -535,6 +648,7 @@ bool SFTPClient::rename(const std::string& source_path, const std::string& targe
 
 bool SFTPClient::createDirectoryHierarchy(const std::string& path) {
   if (path.empty()) {
+    last_error_ = LIBSSH2_FX_OK;
     return false;
   }
   bool absolute = path[0] == '/';
@@ -552,7 +666,8 @@ bool SFTPClient::createDirectoryHierarchy(const std::string& path) {
       if (err != LIBSSH2_FX_FILE_ALREADY_EXISTS &&
           err != LIBSSH2_FX_FAILURE &&
           err != LIBSSH2_FX_PERMISSION_DENIED) {
-        logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
+        last_error_ = err;
+        logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(last_error_));
         return false;
       } else {
         logger_->log_debug("Non-fatal failure to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
@@ -564,7 +679,8 @@ bool SFTPClient::createDirectoryHierarchy(const std::string& path) {
 
 bool SFTPClient::removeFile(const std::string& path) {
   if (libssh2_sftp_unlink(sftp_session_, path.c_str()) != 0) {
-    logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -572,7 +688,8 @@ bool SFTPClient::removeFile(const std::string& path) {
 
 bool SFTPClient::removeDirectory(const std::string& path) {
   if (libssh2_sftp_rmdir(sftp_session_, path.c_str()) != 0) {
-    logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -587,7 +704,8 @@ bool SFTPClient::listDirectory(const std::string& path, bool follow_symlinks,
                                                           0 /* mode */,
                                                           LIBSSH2_SFTP_OPENDIR);
   if (dir_handle == nullptr) {
-    logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   utils::ScopeGuard guard([&dir_handle]() {
@@ -605,16 +723,17 @@ bool SFTPClient::listDirectory(const std::string& path, bool follow_symlinks,
                                       longentry.size(),
                                       &attrs);
     if (ret < 0) {
-      logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
       return false;
     } else if (ret == 0) {
       break;
     }
     if (follow_symlinks && attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && LIBSSH2_SFTP_S_ISLNK(attrs.permissions)) {
+      std::stringstream new_path;
+      new_path << path << "/" << filename.data();
       auto orig_attrs = attrs;
-      bool file_not_exists;
-      if (!this->stat(path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-        logger_->log_debug("Failed to stat directory child \"%s/%s\", error: %s", path.c_str(), filename.data(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      if (!this->stat(new_path.str(), true /*follow_symlinks*/, attrs)) {
         attrs = orig_attrs;
       }
     }
@@ -623,18 +742,14 @@ bool SFTPClient::listDirectory(const std::string& path, bool follow_symlinks,
   return true;
 }
 
-bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists) {
-  file_not_exists = false;
+bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result) {
   if (libssh2_sftp_stat_ex(sftp_session_,
                             path.c_str(),
                             path.length(),
                             follow_symlinks ? LIBSSH2_SFTP_STAT : LIBSSH2_SFTP_LSTAT,
                             &result) != 0) {
-    auto error = libssh2_sftp_last_error(sftp_session_);
-    if (error == LIBSSH2_FX_NO_SUCH_FILE) {
-      file_not_exists = true;
-    }
-    logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(error));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -646,8 +761,7 @@ bool SFTPClient::setAttributes(const std::string& path, const SFTPAttributes& in
   if ((!!(input.flags & SFTP_ATTRIBUTE_UID) != !!(input.flags & SFTP_ATTRIBUTE_GID)) ||
       (!!(input.flags & SFTP_ATTRIBUTE_MTIME) != !!(input.flags & SFTP_ATTRIBUTE_ATIME))) {
     /* Because we can only set these attributes in pairs, we must stat first to learn the other */
-    bool file_not_exists;
-    if (!this->stat(path, false /*follow_symlinks*/, attrs, file_not_exists)) {
+    if (!this->stat(path, false /*follow_symlinks*/, attrs)) {
       return false;
     }
   }
@@ -678,7 +792,8 @@ bool SFTPClient::setAttributes(const std::string& path, const SFTPAttributes& in
                            path.length(),
                            LIBSSH2_SFTP_SETSTAT,
                            &attrs) != 0) {
-    logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
 
diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h
index 4d6ce49..8e6c234 100644
--- a/extensions/sftp/client/SFTPClient.h
+++ b/extensions/sftp/client/SFTPClient.h
@@ -40,6 +40,36 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+enum class SFTPError : uint8_t {
+  SFTP_ERROR_OK = 0,
+  SFTP_ERROR_PERMISSION_DENIED,
+  SFTP_ERROR_FILE_NOT_EXISTS,
+  SFTP_ERROR_FILE_ALREADY_EXISTS,
+  SFTP_ERROR_COMMUNICATIONS_FAILURE,
+  SFTP_ERROR_IO_ERROR,
+  SFTP_ERROR_UNEXPECTED
+};
+
+class LastSFTPError {
+ public:
+  LastSFTPError();
+
+  LastSFTPError(const LastSFTPError&) = delete;
+  LastSFTPError(LastSFTPError&&) = delete;
+  LastSFTPError& operator=(const LastSFTPError&) = delete;
+  LastSFTPError& operator=(LastSFTPError&&) = delete;
+
+  LastSFTPError& operator=(unsigned long libssh2_sftp_error);
+  LastSFTPError& operator=(const SFTPError& sftp_error);
+  operator unsigned long() const;
+  operator SFTPError() const;
+
+ private:
+  bool sftp_error_set_;
+  unsigned long libssh2_sftp_error_;
+  SFTPError sftp_error_;
+};
+
 class SFTPClient {
  public:
 
@@ -77,6 +107,13 @@ class SFTPClient {
 
   bool sendKeepAliveIfNeeded(int &seconds_to_next);
 
+  /**
+   * If any function below this returns false, this function provides the last SFTP-related error.
+   * If a function did not fail because of an SFTP-related error, this function will return SFTP_ERROR_OK.
+   * If this function is called after a function returns true, the return value is UNDEFINED.
+   */
+  SFTPError getLastError() const;
+
   bool getFile(const std::string& path, io::BaseStream& output, int64_t expected_size = -1);
 
   bool putFile(const std::string& path, io::BaseStream& input, bool overwrite, int64_t expected_size = -1);
@@ -92,7 +129,7 @@ class SFTPClient {
   bool listDirectory(const std::string& path, bool follow_symlinks,
       std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>>& children_result);
 
-  bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists);
+  bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result);
 
   static const uint32_t SFTP_ATTRIBUTE_PERMISSIONS = 0x00000001;
   static const uint32_t SFTP_ATTRIBUTE_UID         = 0x00000002;
@@ -148,6 +185,7 @@ class SFTPClient {
 
   bool connected_;
 
+  LastSFTPError last_error_;
 };
 
 } /* namespace utils */
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
new file mode 100644
index 0000000..3bc156c
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property FetchSFTP::RemoteFile(
+    core::PropertyBuilder::createProperty("Remote File")->withDescription("The fully qualified filename on the remote system")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")->withDescription("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({COMPLETION_STRATEGY_NONE,
+                                            COMPLETION_STRATEGY_MOVE_FILE,
+                                            COMPLETION_STRATEGY_DELETE_FILE})
+        ->withDefaultValue(COMPLETION_STRATEGY_NONE)->build());
+core::Property FetchSFTP::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")->withDescription("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+                                                                                         "This property is ignored unless the Completion Strategy is set to 'Move File'. "
+                                                                                         "The specified directory must already exist on the remote system if 'Create Directory' is disabled, or the rename will fail.")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CreateDirectory(
+    core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::DisableDirectoryListing(
+    core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("Control how 'Move Destination Directory' is created when 'Completion Strategy' is 'Move File' and 'Create Directory' is enabled. "
+                                                                                        "If set to 'true', directory listing is not performed prior to create missing directories. "
+                                                                                        "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
+                                                                                        "However, there are situations that you might need to disable the directory listing such as the following. "
+                                                                                        "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
+                                                                                        "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
+                                                                                        "then an error is returned because the directory already exists.")
+        ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::UseCompression(
+    core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+
+core::Relationship FetchSFTP::Success("success", "All FlowFiles that are received are routed to success");
+core::Relationship FetchSFTP::CommsFailure("comms.failure", "Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.");
+core::Relationship FetchSFTP::NotFound("not.found", "Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.");
+core::Relationship FetchSFTP::PermissionDenied("permission.denied", "Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.");
+
+void FetchSFTP::initialize() {
+  logger_->log_trace("Initializing FetchSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  addSupportedCommonProperties(properties);
+  properties.insert(RemoteFile);
+  properties.insert(CompletionStrategy);
+  properties.insert(MoveDestinationDirectory);
+  properties.insert(CreateDirectory);
+  properties.insert(DisableDirectoryListing);
+  properties.insert(UseCompression);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(CommsFailure);
+  relationships.insert(NotFound);
+  relationships.insert(PermissionDenied);
+  setSupportedRelationships(relationships);
+}
+
+FetchSFTP::FetchSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : SFTPProcessorBase(name, uuid),
+      create_directory_(false),
+      disable_directory_listing_(false) {
+  logger_ = logging::LoggerFactory<FetchSFTP>::getLogger();
+}
+
+FetchSFTP::~FetchSFTP() {
+}
+
+void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
+  std::string value;
+  context->getProperty(CompletionStrategy.getName(), completion_strategy_);
+  if (!context->getProperty(CreateDirectory.getName(), value)) {
+    logger_->log_error("Create Directory attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, create_directory_);
+  }
+  if (!context->getProperty(DisableDirectoryListing.getName(), value)) {
+    logger_->log_error("Disable Directory Listing attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, disable_directory_listing_);
+  }
+  if (!context->getProperty(UseCompression.getName(), value)) {
+    logger_->log_error("Use Compression attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, use_compression_);
+  }
+
+  startKeepaliveThreadIfNeeded();
+}
+
+FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
+                                    utils::SFTPClient& client)
+    : logger_(logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger())
+    , remote_file_(remote_file)
+    , client_(client) {
+}
+
+FetchSFTP::WriteCallback::~WriteCallback() {
+}
+
+int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  if (!client_.getFile(remote_file_, *stream)) {
+    throw client_.getLastError();
+  }
+  return stream->getSize();
+}
+
+void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
+  if (flow_file == nullptr) {
+    return;
+  }
+
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+    context->yield();
+    return;
+  }
+
+  /* Parse processor-specific properties */
+  std::string remote_file;
+  std::string move_destination_directory;
+
+  context->getProperty(RemoteFile, remote_file, flow_file);
+  context->getProperty(MoveDestinationDirectory, move_destination_directory, flow_file);
+
+  /* Get SFTPClient from cache or create it */
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
+  if (client == nullptr) {
+    context->yield();
+    return;
+  }
+
+  /*
+   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+   * So we will only call this when we're sure that the connection is OK.
+   */
+  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+    addConnectionToCache(connection_cache_key, std::move(client));
+  };
+
+  /* Download file */
+  WriteCallback write_callback(remote_file, *client);
+  try {
+    session->write(flow_file, &write_callback);
+  } catch (const utils::SFTPError& error) {
+    switch (error) {
+      case utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED:
+        session->transfer(flow_file, PermissionDenied);
+        put_connection_back_to_cache();
+        return;
+      case utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS:
+        session->transfer(flow_file, NotFound);
+        put_connection_back_to_cache();
+        return;
+      case utils::SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE:
+      case utils::SFTPError::SFTP_ERROR_IO_ERROR:
+        session->transfer(flow_file, CommsFailure);
+        return;
+      default:
+        session->transfer(flow_file, PermissionDenied);
+        return;
+    }
+  }
+
+  /* Set attributes */
+  std::string parent_path;
+  std::string child_path;
+  std::tie(parent_path, child_path) = utils::file::FileUtils::split_path(remote_file, true /*force_posix*/);
+
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname);
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port));
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file);
+  flow_file->updateKeyedAttribute(FILENAME, child_path);
+  if (!parent_path.empty()) {
+    flow_file->updateKeyedAttribute(PATH, parent_path);
+  }
+
+  /* Execute completion strategy */
+  if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) {
+    if (!client->removeFile(remote_file)) {
+      logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"%s\"", remote_file);
+    }
+  } else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) {
+    bool should_move = true;
+    if (create_directory_) {
+      auto res = createDirectoryHierarchy(*client, move_destination_directory, disable_directory_listing_);
+      if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) {
+        should_move = false;
+      }
+    }
+    if (!should_move) {
+      logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"%s\"", move_destination_directory);
+    } else {
+      auto target_path = utils::file::FileUtils::concat_path(move_destination_directory, child_path);
+      if (!client->rename(remote_file, target_path, false /*overwrite*/)) {
+        logger_->log_warn("Completion Strategy is Move File, but failed to move file \"%s\" to \"%s\"", remote_file, target_path);
+      }
+    }
+  }
+
+  session->transfer(flow_file, Success);
+  put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
new file mode 100644
index 0000000..749b12e
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FETCH_SFTP_H__
+#define __FETCH_SFTP_H__
+
+#include <memory>
+#include <string>
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FetchSFTP : public SFTPProcessorBase {
+ public:
+
+  static constexpr char const *COMPLETION_STRATEGY_NONE = "None";
+  static constexpr char const *COMPLETION_STRATEGY_MOVE_FILE = "Move File";
+  static constexpr char const *COMPLETION_STRATEGY_DELETE_FILE = "Delete File";
+
+  static constexpr char const* ProcessorName = "FetchSFTP";
+
+
+  /*!
+   * Create a new processor
+   */
+  FetchSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+  virtual ~FetchSFTP();
+
+  // Supported Properties
+  static core::Property RemoteFile;
+  static core::Property CompletionStrategy;
+  static core::Property MoveDestinationDirectory;
+  static core::Property CreateDirectory;
+  static core::Property DisableDirectoryListing;
+  static core::Property UseCompression;
+
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship CommsFailure;
+  static core::Relationship NotFound;
+  static core::Relationship PermissionDenied;
+
+  // Writes Attributes
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT= "sftp.remote.port";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_FILENAME = "sftp.remote.filename";
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  virtual void initialize() override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(const std::string& remote_file,
+                 utils::SFTPClient& client);
+    ~WriteCallback();
+    virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    std::shared_ptr<logging::Logger> logger_;
+    const std::string remote_file_;
+    utils::SFTPClient& client_;
+  };
+
+ private:
+
+  std::string completion_strategy_;
+  bool create_directory_;
+  bool disable_directory_listing_;
+};
+
+REGISTER_RESOURCE(FetchSFTP, "Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
new file mode 100644
index 0000000..e4b56be
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -0,0 +1,1173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <list>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/ostreamwrapper.h"
+#include "rapidjson/istreamwrapper.h"
+#include "rapidjson/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ListSFTP::ListingStrategy(
+    core::PropertyBuilder::createProperty("Listing Strategy")->withDescription("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({LISTING_STRATEGY_TRACKING_TIMESTAMPS,
+                                            LISTING_STRATEGY_TRACKING_ENTITIES})
+        ->withDefaultValue(LISTING_STRATEGY_TRACKING_TIMESTAMPS)->build());
+core::Property ListSFTP::RemotePath(
+    core::PropertyBuilder::createProperty("Remote Path")->withDescription("The fully qualified filename on the remote system")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::SearchRecursively(
+    core::PropertyBuilder::createProperty("Search Recursively")->withDescription("If true, will pull files from arbitrarily nested subdirectories; "
+                                                                                 "otherwise, will not traverse subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FollowSymlink(
+    core::PropertyBuilder::createProperty("Follow symlink")->withDescription("If true, will pull even symbolic files and also nested symbolic subdirectories; "
+                                                                             "otherwise, will not read symbolic files and will not traverse symbolic link subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FileFilterRegex(
+    core::PropertyBuilder::createProperty("File Filter Regex")->withDescription("Provides a Java Regular Expression for filtering Filenames; "
+                                                                                "if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+        ->isRequired(false)->build());
+core::Property ListSFTP::PathFilterRegex(
+    core::PropertyBuilder::createProperty("Path Filter Regex")->withDescription("When Search Recursively is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+        ->isRequired(false)->build());
+core::Property ListSFTP::IgnoreDottedFiles(
+    core::PropertyBuilder::createProperty("Ignore Dotted Files")->withDescription("If true, files whose names begin with a dot (\".\") will be ignored")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property ListSFTP::TargetSystemTimestampPrecision(
+    core::PropertyBuilder::createProperty("Target System Timestamp Precision")->withDescription("Specify timestamp precision at the target system. "
+                                                                                                "Since this processor uses timestamp of entities to decide which should be listed, "
+                                                                                                "it is crucial to use the right timestamp precision.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES})
+        ->withDefaultValue(TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)->build());
+core::Property ListSFTP::EntityTrackingTimeWindow(
+    core::PropertyBuilder::createProperty("Entity Tracking Time Window")->withDescription("Specify how long this processor should track already-listed entities. "
+                                                                                          "'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. "
+                                                                                          "For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. "
+                                                                                          "A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: "
+                                                                                          "1. does not exist in the already-listed entities, "
+                                                                                          "2. has newer timestamp than the cached entity, "
+                                                                                          "3. has different size than the cached entity. "
+                                                                                          "If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. "
+                                                                                          "Used by 'Tracking Entities' strategy.")
+        ->isRequired(false)->build());
+core::Property ListSFTP::EntityTrackingInitialListingTarget(
+    core::PropertyBuilder::createProperty("Entity Tracking Initial Listing Target")->withDescription("Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.")
+        ->withAllowableValues<std::string>({ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW,
+                                            ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE})
+        ->isRequired(false)->withDefaultValue(ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)->build());
+core::Property ListSFTP::MinimumFileAge(
+    core::PropertyBuilder::createProperty("Minimum File Age")->withDescription("The minimum age that a file must be in order to be pulled; "
+                                                                               "any file younger than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("0 sec")->build());
+core::Property ListSFTP::MaximumFileAge(
+    core::PropertyBuilder::createProperty("Maximum File Age")->withDescription("The maximum age that a file must be in order to be pulled; "
+                                                                               "any file older than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(false)->build());
+core::Property ListSFTP::MinimumFileSize(
+    core::PropertyBuilder::createProperty("Minimum File Size")->withDescription("The minimum size that a file must be in order to be pulled")
+        ->isRequired(true)->withDefaultValue<core::DataSizeValue>("0 B")->build());
+core::Property ListSFTP::MaximumFileSize(
+    core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file must be in order to be pulled")
+        ->isRequired(false)->build());
+core::Property ListSFTP::StateFile(
+    core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the file that should be used for storing state about"
+                                                                         " what data has been ingested so that upon restart MiNiFi can resume from where it left off")
+        ->isRequired(true)->withDefaultValue("ListSFTP")->build());
+
+core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed to success");
+
+const std::map<std::string, uint64_t> ListSFTP::LISTING_LAG_MAP = {
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS, 1000},
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES, 60000},
+};
+
+void ListSFTP::initialize() {
+  logger_->log_trace("Initializing FetchSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  addSupportedCommonProperties(properties);
+  properties.insert(ListingStrategy);
+  properties.insert(RemotePath);
+  properties.insert(SearchRecursively);
+  properties.insert(FollowSymlink);
+  properties.insert(FileFilterRegex);
+  properties.insert(PathFilterRegex);
+  properties.insert(IgnoreDottedFiles);
+  properties.insert(TargetSystemTimestampPrecision);
+  properties.insert(EntityTrackingTimeWindow);
+  properties.insert(EntityTrackingInitialListingTarget);
+  properties.insert(MinimumFileAge);
+  properties.insert(MaximumFileAge);
+  properties.insert(MinimumFileSize);
+  properties.insert(MaximumFileSize);
+  properties.insert(StateFile);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+ListSFTP::ListSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : SFTPProcessorBase(name, uuid)
+    , search_recursively_(false)
+    , follow_symlink_(false)
+    , file_filter_regex_set_(false)
+    , path_filter_regex_set_(false)
+    , ignore_dotted_files_(false)
+    , minimum_file_age_(0U)
+    , maximum_file_age_(0U)
+    , minimum_file_size_(0U)
+    , maximum_file_size_(0U)
+    , already_loaded_from_cache_(false)
+    , last_listed_latest_entry_timestamp_(0U)
+    , last_processed_latest_entry_timestamp_(0U)
+    , initial_listing_complete_(false) {
+  logger_ = logging::LoggerFactory<ListSFTP>::getLogger();
+}
+
+ListSFTP::~ListSFTP() {
+#ifndef WIN32
+  if (file_filter_regex_set_) {
+    regfree(&compiled_file_filter_regex_);
+  }
+  if (path_filter_regex_set_) {
+    regfree(&compiled_path_filter_regex_);
+  }
+#endif
+}
+
+void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
+  std::string value;
+  context->getProperty(ListingStrategy.getName(), listing_strategy_);
+  if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
+    invalidateCache();
+  }
+  last_listing_strategy_ = listing_strategy_;
+  if (!context->getProperty(SearchRecursively.getName(), value)) {
+    logger_->log_error("Search Recursively attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, search_recursively_);
+  }
+  if (!context->getProperty(FollowSymlink.getName(), value)) {
+    logger_->log_error("Follow symlink attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, follow_symlink_);
+  }
+  if (context->getProperty(FileFilterRegex.getName(), file_filter_regex_)) {
+#ifndef WIN32
+    if (file_filter_regex_set_) {
+      regfree(&compiled_file_filter_regex_);
+    }
+    int ret = regcomp(&compiled_file_filter_regex_, file_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      file_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_file_filter_regex_ = std::regex(file_filter_regex_);
+      file_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    file_filter_regex_set_ = false;
+  }
+  if (context->getProperty(PathFilterRegex.getName(), path_filter_regex_)) {
+#ifndef WIN32
+    if (path_filter_regex_set_) {
+      regfree(&compiled_path_filter_regex_);
+    }
+    int ret = regcomp(&compiled_path_filter_regex_, path_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      path_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_path_filter_regex_ = std::regex(path_filter_regex_);
+      path_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      path_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    path_filter_regex_set_ = false;
+  }
+  if (!context->getProperty(IgnoreDottedFiles.getName(), value)) {
+    logger_->log_error("Ignore Dotted Files attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, ignore_dotted_files_);
+  }
+  context->getProperty(TargetSystemTimestampPrecision.getName(), target_system_timestamp_precision_);
+  context->getProperty(EntityTrackingInitialListingTarget.getName(), entity_tracking_initial_listing_target_);
+  if (!context->getProperty(MinimumFileAge.getName(), value)) {
+    logger_->log_error("Minimum File Age attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, minimum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(minimum_file_age_, unit, minimum_file_age_)) {
+      logger_->log_error("Minimum File Age attribute is invalid");
+    }
+  }
+  if (context->getProperty(MaximumFileAge.getName(), value)) {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, maximum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(maximum_file_age_, unit, maximum_file_age_)) {
+      logger_->log_error("Maximum File Age attribute is invalid");
+    }
+  }
+  if (!context->getProperty(MinimumFileSize.getName(), minimum_file_size_)) {
+    logger_->log_error("Minimum File Size attribute is invalid");
+  }
+  if (context->getProperty(MaximumFileSize.getName(), value)) {
+    if (!core::DataSizeValue::StringToInt(value, maximum_file_size_)) {
+      logger_->log_error("Maximum File Size attribute is invalid");
+    }
+  }
+  context->getProperty(StateFile.getName(), value);
+  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
+    auto new_tracking_timestamps_state_filename = ss.str();
+    if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
+      if (!tracking_timestamps_state_filename_.empty()) {
+        if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
+                             tracking_timestamps_state_filename_.c_str());
+        }
+      }
+    }
+    tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
+  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingEntities";
+    auto new_tracking_entities_state_filename = ss.str();
+    ss << ".json";
+    auto new_tracking_entities_state_json_filename = ss.str();
+    if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
+      if (!tracking_entities_state_filename_.empty()) {
+        if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
+                             tracking_entities_state_filename_.c_str());
+        }
+      }
+      if (!tracking_entities_state_json_filename_.empty()) {
+        if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
+                             tracking_entities_state_json_filename_.c_str());
+        }
+      }
+    }
+    tracking_entities_state_filename_ = new_tracking_entities_state_filename;
+    tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
+  } else {
+    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+  }
+
+  startKeepaliveThreadIfNeeded();
+}
+
+void ListSFTP::invalidateCache() {
+  logger_->log_warn("Important properties have been reconfigured, invalidating in-memory cache");
+
+  already_loaded_from_cache_ = false;
+
+  last_run_time_ = std::chrono::time_point<std::chrono::steady_clock>();
+  last_listed_latest_entry_timestamp_ = 0U;
+  last_processed_latest_entry_timestamp_ = 0U;
+  latest_identifiers_processed_.clear();
+
+  initial_listing_complete_ = false;
+  already_listed_entities_.clear();
+}
+
+ListSFTP::Child::Child()
+    :directory(false) {
+  memset(&attrs, 0x00, sizeof(attrs));
+}
+
+ListSFTP::Child::Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child) {
+  parent_path = parent_path_;
+  std::tie(filename, std::ignore, attrs) = std::move(sftp_child);
+  directory = LIBSSH2_SFTP_S_ISDIR(attrs.permissions);
+}
+
+std::string ListSFTP::Child::getPath() const {
+  return utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+}
+
+bool ListSFTP::filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child) {
+  const std::string& filename = std::get<0>(sftp_child);
+  const LIBSSH2_SFTP_ATTRIBUTES& attrs = std::get<2>(sftp_child);
+  /* This should not happen */
+  if (filename.empty()) {
+    logger_->log_error("Listing directory \"%s\" returned an empty child", parent_path.c_str());
+    return false;
+  }
+  /* Ignore current dir and parent dir */
+  if (filename == "." || filename == "..") {
+    return false;
+  }
+  /* Dotted files */
+  if (ignore_dotted_files_ && filename[0] == '.') {
+    logger_->log_debug("Ignoring \"%s/%s\" because Ignore Dotted Files is true", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get permissions in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+  if (LIBSSH2_SFTP_S_ISREG(attrs.permissions)) {
+    return filterFile(parent_path, filename, attrs);
+  } else if (LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+    return filterDirectory(parent_path, filename, attrs);
+  } else {
+    logger_->log_debug("Skipping non-regular, non-directory file \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+}
+
+bool ListSFTP::filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get all attributes in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+
+  /* Age */
+  time_t now = time(nullptr);
+  int64_t file_age = (now - attrs.mtime) * 1000;
+  if (file_age < minimum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is younger than the Minimum File Age: %ld ms < %lu ms",
+        parent_path.c_str(),
+        filename.c_str(),
+        file_age,
+        minimum_file_age_);
+    return false;
+  }
+  if (maximum_file_age_ != 0U && file_age > maximum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age: %ld ms > %lu ms",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       file_age,
+                       maximum_file_age_);
+    return false;
+  }
+
+  /* Size */
+  if (attrs.filesize < minimum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is smaller than the Minimum File Size: %lu B < %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       minimum_file_size_);
+    return false;
+  }
+  if (maximum_file_size_ != 0U && attrs.filesize > maximum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is larger than the Maximum File Size: %lu B > %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       maximum_file_size_);
+    return false;
+  }
+
+  /* File Filter Regex */
+  if (file_filter_regex_set_) {
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_file_filter_regex_, filename.c_str(), static_cast<size_t>(0), nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(filename, compiled_file_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Ignoring \"%s/%s\" because it did not match the File Filter Regex \"%s\"",
+                         parent_path.c_str(),
+                         filename.c_str(),
+                         file_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!search_recursively_) {
+    return false;
+  }
+
+  /* Path Filter Regex */
+  if (path_filter_regex_set_) {
+    std::string dir_path = utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_path_filter_regex_, dir_path.c_str(), static_cast<size_t>(0), nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(dir_path, compiled_path_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Not recursing into \"%s\" because it did not match the Path Filter Regex \"%s\"",
+                         dir_path.c_str(),
+                         path_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::createAndTransferFlowFileFromChild(
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const ListSFTP::Child& child) {
+  /* Convert mtime to string */
+  if (child.attrs.mtime > std::numeric_limits<int64_t>::max()) {
+    logger_->log_error("Modification date %lu of \"%s/%s\" larger than int64_t max", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+  std::string mtime_str;
+  if (!getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
+    logger_->log_error("Failed to convert modification date %lu of \"%s/%s\" to string", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+
+  /* Create FlowFile */
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+  if (flow_file == nullptr) {
+    logger_->log_error("Failed to create FlowFileRecord");
+    return false;
+  }
+
+  /* Set attributes */
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, hostname);
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(port));
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_LISTING_USER, username);
+
+  /* uid and gid */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_OWNER, std::to_string(child.attrs.uid));
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_GROUP, std::to_string(child.attrs.gid));
+
+  /* permissions */
+  std::stringstream ss;
+  ss << std::setfill('0') << std::setw(4) << std::oct << (child.attrs.permissions & 0777);
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_PERMISSIONS, ss.str());
+
+  /* filesize */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_SIZE, std::to_string(child.attrs.filesize));
+
+  /* mtime */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_LASTMODIFIEDTIME, mtime_str);
+
+  flow_file->updateKeyedAttribute(FILENAME, child.filename);
+  flow_file->updateKeyedAttribute(PATH, child.parent_path);
+
+  session->transfer(flow_file, Success);
+
+  return true;
+}
+
+ListSFTP::ListedEntity::ListedEntity()
+    : timestamp(0U)
+    , size(0U) {
+}
+
+ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
+    : timestamp(timestamp_)
+    , size(size_) {
+}
+
+bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ofstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  file << "hostname=" << hostname << "\n";
+  file << "username=" << username << "\n";
+  file << "remote_path=" << remote_path << "\n";
+  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
+  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n";
+  size_t i = 0;
+  for (const auto& identifier : latest_identifiers_processed_) {
+    file << "id." << i << "=" << identifier << "\n";
+    ++i;
+  }
+  return true;
+}
+
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ifstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  std::string state_hostname;
+  std::string state_username;
+  std::string state_remote_path;
+  uint64_t state_listing_timestamp;
+  uint64_t state_processed_timestamp;
+  std::set<std::string> state_ids;
+
+  std::string line;
+  while (std::getline(file, line)) {
+    size_t separator_pos = line.find('=');
+    if (separator_pos == std::string::npos) {
+      logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
+    }
+    std::string key = line.substr(0, separator_pos);
+    std::string value = line.substr(separator_pos + 1);
+    if (key == "hostname") {
+      state_hostname = std::move(value);
+    } else if (key == "username") {
+      state_username = std::move(value);
+    } else if (key == "remote_path") {
+      state_remote_path = std::move(value);
+    } else if (key == "listing.timestamp") {
+      try {
+        state_listing_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key == "processed.timestamp") {
+      try {
+        state_processed_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key.compare(0, strlen("id."), "id.") == 0) {
+      state_ids.emplace(std::move(value));
+    } else {
+      logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), key.c_str());
+    }
+  }
+  file.close();
+
+  if (state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error("Tracking Timestamps state file \"%s\" was created with different settings than the current ones, ignoring. "
+                       "Hostname: \"%s\" vs. \"%s\", "
+                       "Username: \"%s\" vs. \"%s\", "
+                       "Remote Path: \"%s\" vs. \"%s\"",
+                       tracking_timestamps_state_filename_.c_str(),
+                       state_hostname, hostname,
+                       state_username, username,
+                       state_remote_path, remote_path);
+    return false;
+  }
+
+  last_listed_latest_entry_timestamp_ = state_listing_timestamp;
+  last_processed_latest_entry_timestamp_ = state_processed_timestamp;
+  latest_identifiers_processed_ = std::move(state_ids);
+
+  return true;
+}
+
+void ListSFTP::listByTrackingTimestamps(
+    const std::shared_ptr<core::ProcessContext>& context,
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const std::string& remote_path,
+    std::vector<Child>&& files) {
+  uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
+
+  /* Load state from cache file if needed */
+  if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty()) {
+    if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    } else {
+      logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    }
+    already_loaded_from_cache_ = true;
+  }
+
+  std::chrono::time_point<std::chrono::steady_clock> current_run_time = std::chrono::steady_clock::now();
+  time_t now = time(nullptr);
+
+  /* Order children by timestamp and try to detect timestamp precision if needed  */
+  std::map<uint64_t /*timestamp*/, std::list<Child>> ordered_files;
+  bool target_system_has_seconds = false;
+  for (auto&& file : files) {
+    uint64_t timestamp = file.attrs.mtime * 1000;
+    target_system_has_seconds |= timestamp % 60000 != 0;
+
+    bool new_file = min_timestamp_to_list == 0U || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
+    if (new_file) {
+      auto& files_for_timestamp = ordered_files[timestamp];
+      files_for_timestamp.emplace_back(std::move(file));
+    } else {
+      logger_->log_trace("Skipping \"%s\", because it is not new.", file.getPath().c_str());
+    }
+  }
+
+  uint64_t latest_listed_entry_timestamp_this_cycle = 0U;
+  size_t flow_files_created = 0U;
+  if (ordered_files.size() > 0) {
+    latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;
+
+    std::string remote_system_timestamp_precision;
+    if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
+      if (target_system_has_seconds) {
+        logger_->log_debug("Precision auto detection detected second precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+      } else {
+        logger_->log_debug("Precision auto detection detected minute precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+      }
+    } else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+    } else {
+      /*
+       * We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
+       * so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
+       */
+      remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+    }
+    uint64_t listing_lag = LISTING_LAG_MAP.at(remote_system_timestamp_precision);
+    logger_->log_debug("The listing lag is %lu ms", listing_lag);
+
+    /* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
+    if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_) {
+      const auto& latest_files = ordered_files.at(latest_listed_entry_timestamp_this_cycle);
+      uint64_t elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_).count();
+      /* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
+      if (elapsed_time < listing_lag) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+                           "and the listing lag has not yet elapsed (%lu ms < % lu ms). Yielding.",
+                           latest_listed_entry_timestamp_this_cycle,
+                           elapsed_time,
+                           listing_lag);
+        context->yield();
+        return;
+      }
+      /*
+       * If we have already processed the entities with the newest timestamp,
+       * and there are no new entities with that timestamp, there is nothing to do.
+       */
+      if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
+          std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
+            return latest_identifiers_processed_.count(child.getPath()) == 1U;
+          })) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+                           "and all files for that timestamp has been processed. Yielding.", latest_listed_entry_timestamp_this_cycle);
+        context->yield();
+        return;
+      }
+    } else {
+      /* Determine the minimum reliable timestamp based on precision */
+      uint64_t minimum_reliable_timestamp = now * 1000 - listing_lag;
+      if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
+        minimum_reliable_timestamp -= minimum_reliable_timestamp % 1000;
+      } else {
+        minimum_reliable_timestamp -= minimum_reliable_timestamp % 60000;
+      }
+      /* If the latest timestamp is not old enough, we wait another cycle */
+      if (minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
+        logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: %lu ms >= %lu ms",
+            latest_listed_entry_timestamp_this_cycle,
+            minimum_reliable_timestamp);
+        ordered_files.erase(latest_listed_entry_timestamp_this_cycle);
+      }
+    }
+
+    for (auto& files_for_timestamp : ordered_files) {
+      if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
+        /* Filter out previously processed entities. */
+        for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
+          if (latest_identifiers_processed_.count(it->getPath()) != 0U) {
+            it = files_for_timestamp.second.erase(it);
+          } else {
+            ++it;
+          }
+        }
+      }
+      for (const auto& file : files_for_timestamp.second) {
+        /* Create the FlowFile for this path */
+        if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
+          flow_files_created++;
+        } else {
+          logger_->log_error("Failed to emit FlowFile for \"%s\"", file.filename);
+          context->yield();
+          return;
+        }
+      }
+    }
+  }
+
+  /* If we have a listing timestamp, it is worth persisting the state */
+  if (latest_listed_entry_timestamp_this_cycle != 0U) {
+    bool processed_new_files = flow_files_created > 0U;
+    if (processed_new_files) {
+      auto last_files_it = ordered_files.crbegin();
+      if (last_files_it->first != last_processed_latest_entry_timestamp_) {
+        latest_identifiers_processed_.clear();
+      }
+
+      for (const auto& last_file : last_files_it->second) {
+        latest_identifiers_processed_.insert(last_file.getPath());
+      }
+
+      last_processed_latest_entry_timestamp_ = last_files_it->first;
+    }
+
+    last_run_time_ = current_run_time;
+
+    if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
+      last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
+      if (!tracking_timestamps_state_filename_.empty()) {
+        persistTrackingTimestampsCache(hostname, username, remote_path);
+      }
+    }
+  } else {
+    logger_->log_debug("There are no files to list. Yielding.");
+    context->yield();
+    return;
+  }
+}
+
+bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ofstream file(tracking_entities_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+  file << "hostname=" << hostname << "\n";
+  file << "username=" << username << "\n";
+  file << "remote_path=" << remote_path << "\n";
+  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
+  file.close();
+
+  std::ofstream json_file(tracking_entities_state_json_filename_);
+  if (!json_file.is_open()) {
+    logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str());
+    return false;
+  }
+
+  rapidjson::Document entities(rapidjson::kObjectType);
+  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
+  for (const auto& already_listed_entity : already_listed_entities_) {
+    rapidjson::Value entity(rapidjson::kObjectType);
+    entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc);
+    entity.AddMember("size", already_listed_entity.second.size, alloc);
+    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc);
+  }
+
+  rapidjson::OStreamWrapper osw(json_file);
+  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
+  entities.Accept(writer);
+
+  return true;
+}
+
+bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ifstream file(tracking_entities_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+  std::string state_hostname;
+  std::string state_username;
+  std::string state_remote_path;
+  std::string state_json_state_file;
+
+  std::string line;
+  while (std::getline(file, line)) {
+    size_t separator_pos = line.find('=');
+    if (separator_pos == std::string::npos) {
+      logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
+      continue;
+    }
+    std::string key = line.substr(0, separator_pos);
+    std::string value = line.substr(separator_pos + 1);
+    if (key == "hostname") {
+      state_hostname = std::move(value);
+    } else if (key == "username") {
+      state_username = std::move(value);
+    } else if (key == "remote_path") {
+      state_remote_path = std::move(value);
+    } else if (key == "json_state_file") {
+      state_json_state_file = std::move(value);
+    } else {
+      logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
+    }
+  }
+  file.close();
+
+  if (state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. "
+                       "Hostname: \"%s\" vs. \"%s\", "
+                       "Username: \"%s\" vs. \"%s\", "
+                       "Remote Path: \"%s\" vs. \"%s\"",
+                       tracking_entities_state_filename_.c_str(),
+                       state_hostname, hostname,
+                       state_username, username,
+                       state_remote_path, remote_path);
+    return false;
+  }
+
+  if (state_json_state_file.empty()) {
+    logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+
+  std::ifstream json_file(state_json_state_file);
+  if (!json_file.is_open()) {
+    logger_->log_error("Failed to open entities Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+    return false;
+  }
+
+  try {
+    rapidjson::IStreamWrapper isw(json_file);
+    rapidjson::Document d;
+    rapidjson::ParseResult res = d.ParseStream(isw);
+    if (!res) {
+      logger_->log_error("Failed to parse Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+      return false;
+    }
+    if (!d.IsObject()) {
+      logger_->log_error("Tracking Entities state json file \"%s\" root is not an object", state_json_state_file.c_str());
+      return false;
+    }
+
+    std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
+    for (const auto &already_listed_entity : d.GetObject()) {
+      auto it = already_listed_entity.value.FindMember("timestamp");
+      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+        logger_->log_error("Tracking Entities state json file \"%s\" timestamp missing or malformatted for entity \"%s\"",
+            state_json_state_file.c_str(),
+            already_listed_entity.name.GetString());
+        continue;
+      }
+      uint64_t timestamp = it->value.GetUint64();
+      it = already_listed_entity.value.FindMember("size");
+      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+        logger_->log_error("Tracking Entities state json file \"%s\" size missing or malformatted for entity \"%s\"",
+                           state_json_state_file.c_str(),
+                           already_listed_entity.name.GetString());
+        continue;
+      }
+      uint64_t size = it->value.GetUint64();
+      new_already_listed_entities.emplace(std::piecewise_construct,
+                                          std::forward_as_tuple(already_listed_entity.name.GetString()),
+                                          std::forward_as_tuple(timestamp, size));
+    }
+    already_listed_entities_ = std::move(new_already_listed_entities);
+  } catch (std::exception& e) {
+    logger_->log_error("Exception while parsing Tracking Entities state json file \"%s\": %s", state_json_state_file.c_str(), e.what());
+    return false;
+  }
+
+  return true;
+}
+
+void ListSFTP::listByTrackingEntities(
+    const std::shared_ptr<core::ProcessContext>& context,
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const std::string& remote_path,
+    uint64_t entity_tracking_time_window,
+    std::vector<Child>&& files) {
+  /* Load state from cache file if needed */
+  if (!already_loaded_from_cache_ && !tracking_entities_state_filename_.empty()) {
+    if (updateFromTrackingEntitiesCache(hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+      initial_listing_complete_ = true;
+    } else {
+      logger_->log_debug("Failed to load Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    }
+    already_loaded_from_cache_ = true;
+  }
+
+  time_t now = time(nullptr);
+  uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
+      ? 0U : (now * 1000 - entity_tracking_time_window);
+
+  /* Skip files not in the tracking window */
+  for (auto it = files.begin(); it != files.end(); ) {
+    if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
+      logger_->log_trace("Skipping \"%s\" because it has an older timestamp than the minimum timestamp to list: %lu < %lu",
+          it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
+      it = files.erase(it);
+    } else {
+      ++it;
+    }
+  }
+
+  if (files.empty()) {
+    logger_->log_debug("No entities to list within the tracking time window");
+    context->yield();
+    return;
+  }
+
+  /* Find files that have been updated */
+  std::vector<Child> updated_entities;
+  std::copy_if(std::make_move_iterator(files.begin()),
+               std::make_move_iterator(files.end()),
+               std::back_inserter(updated_entities),
+               [&](const Child& child) {
+     auto already_listed_it = already_listed_entities_.find(child.getPath());
+     if (already_listed_it == already_listed_entities_.end()) {
+       logger_->log_trace("Found new file \"%s\"", child.getPath());
+       return true;
+     }
+
+     if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
+       logger_->log_trace("Found file \"%s\" with newer timestamp: %lu -> %lu",
+           child.getPath(),
+           already_listed_it->second.timestamp,
+           child.attrs.mtime * 1000);
+       return true;
+     }
+
+     if (child.attrs.filesize != already_listed_it->second.size) {
+       logger_->log_trace("Found file \"%s\" with different size: %lu -> %lu",
+                          child.getPath(),
+                          already_listed_it->second.size,
+                          child.attrs.filesize);
+       return true;
+     }
+
+     logger_->log_trace("Skipping file \"%s\" because it has not changed", child.getPath());
+     return false;
+  });
+
+  /* Find entities in the tracking cache that are no longer in the tracking window */
+  std::vector<std::string> old_entity_ids;
+  for (const auto& already_listed_entity : already_listed_entities_) {
+    if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
+      old_entity_ids.emplace_back(already_listed_entity.first);
+    }
+  }
+
+  /* If we have no new files and no expired tracked entities, we have nothing to do */
+  if (updated_entities.empty() && old_entity_ids.empty()) {
+    context->yield();
+    return;
+  }
+
+  /* Remove expired entities */
+  for (const auto& old_entity_id : old_entity_ids) {
+    already_listed_entities_.erase(old_entity_id);
+  }
+
+  for (const auto& updated_entity : updated_entities) {
+    /* Create the FlowFile for this path */
+    if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
+      logger_->log_error("Failed to emit FlowFile for \"%s\"", updated_entity.getPath());
+      context->yield();
+      return;
+    }
+    already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
+  }
+
+  initial_listing_complete_ = true;
+
+  if (!tracking_entities_state_filename_.empty()) {
+    persistTrackingEntitiesCache(hostname, username, remote_path);
+  }
+}
+
+void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
+    context->yield();
+    return;
+  }
+
+  /* Parse processor-specific properties */
+  std::string remote_path;
+  uint64_t entity_tracking_time_window = 0U;
+
+  std::string value;
+  context->getProperty(RemotePath.getName(), remote_path);
+  /* Remove trailing slashes */
+  while (remote_path.size() > 1U && remote_path.back() == '/') {
+    remote_path.resize(remote_path.size() - 1);
+  }
+  if (context->getProperty(EntityTrackingTimeWindow.getName(), value)) {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, entity_tracking_time_window, unit) ||
+        !core::Property::ConvertTimeUnitToMS(entity_tracking_time_window, unit, entity_tracking_time_window)) {
+      /* The default is 3 hours	*/
+      entity_tracking_time_window = 3 * 3600 * 1000;
+      logger_->log_error("Entity Tracking Time Window attribute is invalid");
+    }
+  } else {
+    /* The default is 3 hours	*/
+    entity_tracking_time_window = 3 * 3600 * 1000;
+  }
+
+  /* Check whether we need to invalidate the cache based on the new properties */
+  if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
+      (!last_username_.empty() && last_username_ != common_properties.username) ||
+      (!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
+    invalidateCache();
+  }
+  last_hostname_ = common_properties.hostname;
+  last_username_ = common_properties.username;
+  last_remote_path_ = remote_path;
+
+  /* Get SFTPClient from cache or create it */
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
+  if (client == nullptr) {
+    context->yield();
+    return;
+  }
+
+  /*
+   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+   * So we will only call this when we're sure that the connection is OK.
+   */
+  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+    addConnectionToCache(connection_cache_key, std::move(client));
+  };
+
+  std::deque<Child> directories;
+  std::vector<Child> files;
+
+  /* Add initial directory */
+  Child root;
+  std::tie(root.parent_path, root.filename) = utils::file::FileUtils::split_path(remote_path, true /*force_posix*/);
+  root.directory = true;
+  directories.emplace_back(std::move(root));
+
+  /* Process directories */
+  while (!directories.empty()) {
+    auto directory = std::move(directories.front());
+    directories.pop_front();
+
+    std::string new_parent_path;
+    if (directory.parent_path.empty()) {
+      new_parent_path = directory.filename;
+    } else {
+      new_parent_path = directory.getPath();
+    }
+    std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
+    if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
+      continue;
+    }
+    for (auto&& dir_child : dir_children) {
+      if (filter(new_parent_path, dir_child)) {
+        Child child(new_parent_path, std::move(dir_child));
+        if (child.directory) {
+          directories.emplace_back(std::move(child));
+        } else {
+          files.emplace_back(std::move(child));
+        }
+      }
+    }
+  }
+
+  /* Process the files with the appropriate tracking strategy */
+  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+    listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, std::move(files));
+  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+    listByTrackingEntities(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, entity_tracking_time_window, std::move(files));
+  } else {
+    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+    context->yield();
+    return;
+  }
+
+  put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
new file mode 100644
index 0000000..4fe32e2
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LIST_SFTP_H__
+#define __LIST_SFTP_H__
+
+#include <memory>
+#include <string>
+#include <map>
+#include <chrono>
+#include <cstdint>
+#ifndef WIN32
+#include <regex.h>
+#else
+#include <regex>
+#endif
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ListSFTP : public SFTPProcessorBase {
+ public:
+
+  static constexpr char const *LISTING_STRATEGY_TRACKING_TIMESTAMPS = "Tracking Timestamps";
+  static constexpr char const *LISTING_STRATEGY_TRACKING_ENTITIES = "Tracking Entities";
+
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT = "Auto Detect";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS = "Milliseconds";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS = "Seconds";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES = "Minutes";
+
+  static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW = "Tracking Time Window";
+  static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE = "All Available";
+
+  static constexpr char const* ProcessorName = "ListSFTP";
+
+
+  /*!
+   * Create a new processor
+   */
+  ListSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+  virtual ~ListSFTP();
+
+  // Supported Properties
+  static core::Property ListingStrategy;
+  static core::Property RemotePath;
+  static core::Property SearchRecursively;
+  static core::Property FollowSymlink;
+  static core::Property FileFilterRegex;
+  static core::Property PathFilterRegex;
+  static core::Property IgnoreDottedFiles;
+  static core::Property TargetSystemTimestampPrecision;
+  static core::Property EntityTrackingTimeWindow;
+  static core::Property EntityTrackingInitialListingTarget;
+  static core::Property MinimumFileAge;
+  static core::Property MaximumFileAge;
+  static core::Property MinimumFileSize;
+  static core::Property MaximumFileSize;
+  static core::Property StateFile;
+
+  // Supported Relationships
+  static core::Relationship Success;
+
+  // Writes Attributes
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT = "sftp.remote.port";
+  static constexpr char const* ATTRIBUTE_SFTP_LISTING_USER = "sftp.listing.user";
+  static constexpr char const* ATTRIBUTE_FILE_OWNER = "file.owner";
+  static constexpr char const* ATTRIBUTE_FILE_GROUP = "file.group";
+  static constexpr char const* ATTRIBUTE_FILE_PERMISSIONS = "file.permissions";
+  static constexpr char const* ATTRIBUTE_FILE_SIZE = "file.size";
+  static constexpr char const* ATTRIBUTE_FILE_LASTMODIFIEDTIME = "file.lastModifiedTime";
+
+  static const std::map<std::string, uint64_t> LISTING_LAG_MAP;
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  virtual void initialize() override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ private:
+
+  std::string listing_strategy_;
+  bool search_recursively_;
+  bool follow_symlink_;
+  std::string file_filter_regex_;
+  std::string path_filter_regex_;
+  bool file_filter_regex_set_;
+  bool path_filter_regex_set_;
+#ifndef WIN32
+  regex_t compiled_file_filter_regex_;
+  regex_t compiled_path_filter_regex_;
+#else
+  std::regex compiled_file_filter_regex_;
+  std::regex compiled_path_filter_regex_;
+#endif
+  bool ignore_dotted_files_;
+  std::string target_system_timestamp_precision_;
+  std::string entity_tracking_initial_listing_target_;
+  uint64_t minimum_file_age_;
+  uint64_t maximum_file_age_;
+  uint64_t minimum_file_size_;
+  uint64_t maximum_file_size_;
+
+  std::string last_listing_strategy_;
+  std::string last_hostname_;
+  std::string last_username_;
+  std::string last_remote_path_;
+
+  struct Child {
+    Child();
+    Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child);
+    std::string getPath() const;
+
+    bool directory;
+    std::string parent_path;
+    std::string filename;
+    LIBSSH2_SFTP_ATTRIBUTES attrs;
+  };
+
+  bool already_loaded_from_cache_;
+
+  std::string tracking_timestamps_state_filename_;
+  std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
+  uint64_t last_listed_latest_entry_timestamp_;
+  uint64_t last_processed_latest_entry_timestamp_;
+  std::set<std::string> latest_identifiers_processed_;
+
+  bool initial_listing_complete_;
+  std::string tracking_entities_state_filename_;
+  std::string tracking_entities_state_json_filename_;
+  struct ListedEntity {
+    uint64_t timestamp;
+    uint64_t size;
+
+    ListedEntity();
+    ListedEntity(uint64_t timestamp, uint64_t size);
+  };
+  std::unordered_map<std::string, ListedEntity> already_listed_entities_;
+
+  void invalidateCache();
+
+  bool filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child);
+  bool filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+  bool filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+
+  bool createAndTransferFlowFileFromChild(
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const Child& child);
+
+  bool persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+  bool persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+  void listByTrackingTimestamps(
+      const std::shared_ptr<core::ProcessContext>& context,
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const std::string& remote_path,
+      std::vector<Child>&& files);
+
+  void listByTrackingEntities(
+      const std::shared_ptr<core::ProcessContext>& context,
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const std::string& remote_path,
+      uint64_t entity_tracking_time_window,
+      std::vector<Child>&& files);
+};
+
+REGISTER_RESOURCE(ListSFTP, "Performs a listing of the files residing on an SFTP server. "
+                            "For each file that is found on the remote server, a new FlowFile will be created with "
+                            "the filename attribute set to the name of the file on the remote server. "
+                            "This can then be used in conjunction with FetchSFTP in order to fetch those files.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index e1b589d..3262feb 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -42,6 +42,7 @@
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
 #include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
 
 namespace org {
 namespace apache {
@@ -49,30 +50,12 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property PutSFTP::Hostname(
-    core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Username(
-    core::PropertyBuilder::createProperty("Username")->withDescription("Username")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Password(
-    core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPath(
-    core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPassphrase(
-    core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
 core::Property PutSFTP::RemotePath(
     core::PropertyBuilder::createProperty("Remote Path")->withDescription("The path on the remote system from which to pull or push files")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::CreateDirectory(
     core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
-        ->withDefaultValue<bool>(false)->build());
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
 core::Property PutSFTP::DisableDirectoryListing(
     core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("If set to 'true', directory listing is not performed prior to create missing directories. "
                                                                                         "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
@@ -80,18 +63,13 @@ core::Property PutSFTP::DisableDirectoryListing(
                                                                                         "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
                                                                                         "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
                                                                                         "then an error is returned because the directory already exists.")
-                                                                                        ->isRequired(false)->withDefaultValue<bool>(false)->build());
+        ->isRequired(false)->withDefaultValue<bool>(false)->build());
 core::Property PutSFTP::BatchSize(
     core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of FlowFiles to send in a single connection")
-        ->withDefaultValue<uint64_t>(500)->build());
-core::Property PutSFTP::ConnectionTimeout(
-    core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
-        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
-core::Property PutSFTP::DataTimeout(
-    core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
-        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+        ->isRequired(true)->withDefaultValue<uint64_t>(500)->build());
 core::Property PutSFTP::ConflictResolution(
     core::PropertyBuilder::createProperty("Conflict Resolution")->withDescription("Determines how to handle the problem of filename collisions")
+        ->isRequired(true)
         ->withAllowableValues<std::string>({CONFLICT_RESOLUTION_REPLACE,
                                             CONFLICT_RESOLUTION_IGNORE,
                                             CONFLICT_RESOLUTION_RENAME,
@@ -109,63 +87,34 @@ core::Property PutSFTP::DotRename(
 core::Property PutSFTP::TempFilename(
     core::PropertyBuilder::createProperty("Temporary Filename")->withDescription("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. "
                                                                                  "If this value is set, the Dot Rename property is ignored.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HostKeyFile(
-    core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
-        ->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::LastModifiedTime(
     core::PropertyBuilder::createProperty("Last Modified Time")->withDescription("The lastModifiedTime to assign to the file after transferring it. "
                                                                                   "If not set, the lastModifiedTime will not be changed. "
                                                                                   "Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
                                                                                   "You may also use expression language such as ${file.lastModifiedTime}. "
                                                                                   "If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::Permissions(
     core::PropertyBuilder::createProperty("Permissions")->withDescription("The permissions to assign to the file after transferring it. "
                                                                           "Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). "
                                                                           "If not set, the permissions will not be changed. "
                                                                           "You may also use expression language such as ${file.permissions}. "
                                                                           "If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::RemoteOwner(
     core::PropertyBuilder::createProperty("Remote Owner")->withDescription("Integer value representing the User ID to set on the file after transferring it. "
                                                                            "If not set, the owner will not be set. You may also use expression language such as ${file.owner}. "
                                                                            "If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::RemoteGroup(
     core::PropertyBuilder::createProperty("Remote Group")->withDescription("Integer value representing the Group ID to set on the file after transferring it. "
                                                                            "If not set, the group will not be set. You may also use expression language such as ${file.group}. "
                                                                            "If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::StrictHostKeyChecking(
-    core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
-        ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::UseKeepaliveOnTimeout(
-    core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
-        ->withDefaultValue<bool>(true)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::UseCompression(
     core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
-        ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::ProxyType(
-    core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
-                                                                         "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
-        ->isRequired(false)
-        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
-                                            PROXY_TYPE_HTTP,
-                                            PROXY_TYPE_SOCKS})
-        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
-core::Property PutSFTP::ProxyHost(
-    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::ProxyPort(
-    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyUsername(
-    core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyPassword(
-    core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
 
 core::Relationship PutSFTP::Success("success", "FlowFiles that are successfully sent will be routed to success");
 core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by the destination system");
@@ -176,35 +125,20 @@ void PutSFTP::initialize() {
 
   // Set the supported properties
   std::set<core::Property> properties;
-  properties.insert(Hostname);
-  properties.insert(Port);
-  properties.insert(Username);
-  properties.insert(Password);
-  properties.insert(PrivateKeyPath);
-  properties.insert(PrivateKeyPassphrase);
+  addSupportedCommonProperties(properties);
   properties.insert(RemotePath);
   properties.insert(CreateDirectory);
   properties.insert(DisableDirectoryListing);
   properties.insert(BatchSize);
-  properties.insert(ConnectionTimeout);
-  properties.insert(DataTimeout);
   properties.insert(ConflictResolution);
   properties.insert(RejectZeroByte);
   properties.insert(DotRename);
   properties.insert(TempFilename);
-  properties.insert(HostKeyFile);
   properties.insert(LastModifiedTime);
   properties.insert(Permissions);
   properties.insert(RemoteOwner);
   properties.insert(RemoteGroup);
-  properties.insert(StrictHostKeyChecking);
-  properties.insert(UseKeepaliveOnTimeout);
   properties.insert(UseCompression);
-  properties.insert(ProxyType);
-  properties.insert(ProxyHost);
-  properties.insert(ProxyPort);
-  properties.insert(HttpProxyUsername);
-  properties.insert(HttpProxyPassword);
   setSupportedProperties(properties);
   
   // Set the supported relationships
@@ -215,143 +149,21 @@ void PutSFTP::initialize() {
   setSupportedRelationships(relationships);
 }
 
-constexpr size_t PutSFTP::CONNECTION_CACHE_MAX_SIZE;
-
 PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
-  : Processor(name, uuid),
-    logger_(logging::LoggerFactory<PutSFTP>::getLogger()),
+  : SFTPProcessorBase(name, uuid),
     create_directory_(false),
     batch_size_(0),
-    connection_timeout_(0),
-    data_timeout_(0),
     reject_zero_byte_(false),
-    dot_rename_(false),
-    strict_host_checking_(false),
-    use_keepalive_on_timeout_(false),
-    use_compression_(false),
-    running_(true) {
+    dot_rename_(false) {
+  logger_ = logging::LoggerFactory<PutSFTP>::getLogger();
 }
 
 PutSFTP::~PutSFTP() {
-  if (keepalive_thread_.joinable()) {
-    {
-      std::lock_guard<std::mutex> lock(connections_mutex_);
-      running_ = false;
-      keepalive_cv_.notify_one();
-    }
-    keepalive_thread_.join();
-  }
-}
-
-bool PutSFTP::ConnectionCacheKey::operator<(const PutSFTP::ConnectionCacheKey& other) const {
-  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) <
-          std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-bool PutSFTP::ConnectionCacheKey::operator==(const PutSFTP::ConnectionCacheKey& other) const {
-  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) ==
-         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-std::unique_ptr<utils::SFTPClient> PutSFTP::getConnectionFromCache(const PutSFTP::ConnectionCacheKey& key) {
-  std::lock_guard<std::mutex> lock(connections_mutex_);
-
-  auto it = connections_.find(key);
-  if (it == connections_.end()) {
-    return nullptr;
-  }
-
-  logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
-      key.username,
-      key.hostname,
-      key.port);
-
-  auto lru_it = std::find(lru_.begin(), lru_.end(), key);
-  if (lru_it == lru_.end()) {
-    logger_->log_trace("Assertion error: can't find key in LRU cache");
-  } else {
-    lru_.erase(lru_it);
-  }
-
-  auto connection = std::move(it->second);
-  connections_.erase(it);
-  return connection;
-}
-
-void PutSFTP::addConnectionToCache(const PutSFTP::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
-  std::lock_guard<std::mutex> lock(connections_mutex_);
-
-  while (connections_.size() >= PutSFTP::CONNECTION_CACHE_MAX_SIZE) {
-    const auto& lru_key = lru_.back();
-    logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
-        lru_key.username,
-        lru_key.hostname,
-        lru_key.port);
-    connections_.erase(lru_key);
-    lru_.pop_back();
-  }
-
-  logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
-      key.username,
-      key.hostname,
-      key.port);
-  connections_.emplace(key, std::move(connection));
-  lru_.push_front(key);
-  keepalive_cv_.notify_one();
-}
-
-void PutSFTP::keepaliveThreadFunc() {
-  std::unique_lock<std::mutex> lock(connections_mutex_);
-
-  while (true) {
-    if (connections_.empty()) {
-      keepalive_cv_.wait(lock, [this] {
-        return !running_ || !connections_.empty();
-      });
-    }
-    if (!running_) {
-      logger_->log_trace("Stopping keepalive thread");
-      lock.unlock();
-      return;
-    }
-
-    int min_wait = 10;
-    for (auto &connection : connections_) {
-      int seconds_to_next = 0;
-      if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
-        logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
-           connection.first.username,
-           connection.first.hostname,
-           connection.first.port,
-           seconds_to_next);
-        if (seconds_to_next < min_wait) {
-          min_wait = seconds_to_next;
-        }
-      } else {
-        logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
-                           connection.first.username,
-                           connection.first.hostname,
-                           connection.first.port);
-      }
-    }
-
-    /* Avoid busy loops */
-    if (min_wait < 1) {
-      min_wait = 1;
-    }
-
-    logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
-    keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
-      return !running_;
-    });
-    if (!running_) {
-      lock.unlock();
-      return;
-    }
-  }
 }
 
 void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
   std::string value;
   if (!context->getProperty(CreateDirectory.getName(), value)) {
     logger_->log_error("Create Directory attribute is missing or invalid");
@@ -363,22 +175,6 @@ void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, c
   } else {
     core::Property::StringToInt(value, batch_size_);
   }
-  if (!context->getProperty(ConnectionTimeout.getName(), value)) {
-    logger_->log_error("Connection Timeout attribute is missing or invalid");
-  } else {
-    core::TimeUnit unit;
-    if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
-      logger_->log_error("Connection Timeout attribute is invalid");
-    }
-  }
-  if (!context->getProperty(DataTimeout.getName(), value)) {
-    logger_->log_error("Data Timeout attribute is missing or invalid");
-  } else {
-    core::TimeUnit unit;
-    if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
-      logger_->log_error("Data Timeout attribute is invalid");
-    }
-  }
   context->getProperty(ConflictResolution.getName(), conflict_resolution_);
   if (context->getProperty(RejectZeroByte.getName(), value)) {
     utils::StringUtils::StringToBool(value, reject_zero_byte_);
@@ -386,43 +182,13 @@ void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, c
   if (context->getProperty(DotRename.getName(), value)) {
     utils::StringUtils::StringToBool(value, dot_rename_);
   }
-  context->getProperty(HostKeyFile.getName(), host_key_file_);
-  if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
-    logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
-  } else {
-    utils::StringUtils::StringToBool(value, strict_host_checking_);
-  }
-  if (!context->getProperty(UseKeepaliveOnTimeout.getName(), value)) {
-    logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
-  } else {
-    utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
-  }
   if (!context->getProperty(UseCompression.getName(), value)) {
     logger_->log_error("Use Compression attribute is missing or invalid");
   } else {
     utils::StringUtils::StringToBool(value, use_compression_);
   }
-  context->getProperty(ProxyType.getName(), proxy_type_);
-
-  if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
-    running_ = true;
-    keepalive_thread_ = std::thread(&PutSFTP::keepaliveThreadFunc, this);
-  }
-}
 
-void PutSFTP::notifyStop() {
-  logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
-  if (keepalive_thread_.joinable()) {
-    {
-      std::lock_guard<std::mutex> lock(connections_mutex_);
-      running_ = false;
-      keepalive_cv_.notify_one();
-    }
-    keepalive_thread_.join();
-  }
-  /* The thread is no longer running, we don't have to lock */
-  connections_.clear();
-  lru_.clear();
+  startKeepaliveThreadIfNeeded();
 }
 
 PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
@@ -443,30 +209,26 @@ int64_t PutSFTP::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
       *stream,
       conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
       stream->getSize() /*expected_size*/)) {
-    return -1;
+    throw client_.getLastError();
   }
-  write_succeeded_ = true;
   return stream->getSize();
 }
 
-bool PutSFTP::ReadCallback::commit() {
-  return write_succeeded_;
-}
-
 bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
   if (flow_file == nullptr) {
     return false;
   }
 
-  /* Parse possibly FlowFile-dependent properties */
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+    context->yield();
+    return false;
+  }
+
+  /* Parse processor-specific properties */
   std::string filename;
-  std::string hostname;
-  uint16_t port = 0U;
-  std::string username;
-  std::string password;
-  std::string private_key_path;
-  std::string private_key_passphrase;
   std::string remote_path;
   bool disable_directory_listing = false;
   std::string temp_file_name;
@@ -478,50 +240,11 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   uint64_t remote_owner = 0U;
   bool remote_group_set = false;
   uint64_t remote_group = 0U;
-  std::string proxy_host;
-  uint16_t proxy_port = 0U;
-  std::string proxy_username;
-  std::string proxy_password;
 
   flow_file->getKeyedAttribute(FILENAME, filename);
 
   std::string value;
-  if (!context->getProperty(Hostname, hostname, flow_file)) {
-    logger_->log_error("Hostname attribute is missing");
-    context->yield();
-    return false;
-  }
-  if (!context->getProperty(Port, value, flow_file)) {
-    logger_->log_error("Port attribute is missing or invalid");
-    context->yield();
-    return false;
-  } else {
-    int port_tmp;
-    if (!core::Property::StringToInt(value, port_tmp) ||
-        port_tmp < std::numeric_limits<uint16_t>::min() ||
-        port_tmp > std::numeric_limits<uint16_t>::max()) {
-      logger_->log_error("Port attribute \"%s\" is invalid", value);
-      context->yield();
-      return false;
-    } else {
-      port = static_cast<uint16_t>(port_tmp);
-    }
-  }
-  if (!context->getProperty(Username, username, flow_file)) {
-    logger_->log_error("Username attribute is missing");
-    context->yield();
-    return false;
-  }
-  context->getProperty(Password, password, flow_file);
-  context->getProperty(PrivateKeyPath, private_key_path, flow_file);
-  context->getProperty(PrivateKeyPassphrase, private_key_passphrase, flow_file);
-  context->getProperty(Password, password, flow_file);
   context->getProperty(RemotePath, remote_path, flow_file);
-  if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
-    utils::StringUtils::StringToBool(value, disable_directory_listing);
-  } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
-    utils::StringUtils::StringToBool(value, disable_directory_listing);
-  }
   /* Remove trailing slashes */
   while (remote_path.size() > 1U && remote_path.back() == '/') {
     remote_path.resize(remote_path.size() - 1);
@@ -530,6 +253,11 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   if (remote_path.empty()) {
     remote_path = ".";
   }
+  if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
+    utils::StringUtils::StringToBool(value, disable_directory_listing);
+  } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
+    utils::StringUtils::StringToBool(value, disable_directory_listing);
+  }
   context->getProperty(TempFilename, temp_file_name, flow_file);
   if (context->getProperty(LastModifiedTime, value, flow_file)) {
     if (core::Property::StringToDateTime(value, last_modified_time)) {
@@ -551,21 +279,6 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
       remote_group_set = true;
     }
   }
-  context->getProperty(ProxyHost, proxy_host, flow_file);
-  if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
-    int port_tmp;
-    if (!core::Property::StringToInt(value, port_tmp) ||
-        port_tmp < std::numeric_limits<uint16_t>::min() ||
-        port_tmp > std::numeric_limits<uint16_t>::max()) {
-      logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
-      context->yield();
-      return false;
-    } else {
-      proxy_port = static_cast<uint16_t>(port_tmp);
-    }
-  }
-  context->getProperty(HttpProxyUsername, proxy_username, flow_file);
-  context->getProperty(HttpProxyPassword, proxy_password, flow_file);
 
   /* Reject zero-byte files if needed */
   if (reject_zero_byte_ && flow_file->getSize() == 0U) {
@@ -575,56 +288,21 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   }
 
   /* Get SFTPClient from cache or create it */
-  const PutSFTP::ConnectionCacheKey connection_cache_key = {hostname, port, username, proxy_type_, proxy_host, proxy_port};
-  auto client = getConnectionFromCache(connection_cache_key);
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
   if (client == nullptr) {
-    client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(hostname, port, username));
-    if (!IsNullOrEmpty(host_key_file_)) {
-      if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
-        logger_->log_error("Cannot set host key file");
-        context->yield();
-        return false;
-      }
-    }
-    if (!IsNullOrEmpty(password)) {
-      client->setPasswordAuthenticationCredentials(password);
-    }
-    if (!IsNullOrEmpty(private_key_path)) {
-      client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
-    }
-    if (proxy_type_ != PROXY_TYPE_DIRECT) {
-      utils::HTTPProxy proxy;
-      proxy.host = proxy_host;
-      proxy.port = proxy_port;
-      proxy.username = proxy_username;
-      proxy.password = proxy_password;
-      if (!client->setProxy(
-          proxy_type_ == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
-          proxy)) {
-        logger_->log_error("Cannot set proxy");
-        context->yield();
-        return false;
-      }
-    }
-    if (!client->setConnectionTimeout(connection_timeout_)) {
-      logger_->log_error("Cannot set connection timeout");
-      context->yield();
-      return false;
-    }
-    client->setDataTimeout(data_timeout_);
-    client->setSendKeepAlive(use_keepalive_on_timeout_);
-    if (!client->setUseCompression(use_compression_)) {
-      logger_->log_error("Cannot set compression");
-      context->yield();
-      return false;
-    }
-
-    /* Connect to SFTP server */
-    if (!client->connect()) {
-      logger_->log_error("Cannot connect to SFTP server");
-      context->yield();
-      return false;
-    }
+    context->yield();
+    return false;
   }
 
   /*
@@ -638,13 +316,10 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   /* Try to detect conflicts if needed */
   std::string resolved_filename = filename;
   if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
-    std::stringstream target_path_ss;
-    target_path_ss << remote_path << "/" << filename;
-    auto target_path = target_path_ss.str();
+    std::string target_path = utils::file::FileUtils::concat_path(remote_path, filename, true /*force_posix*/);
     LIBSSH2_SFTP_ATTRIBUTES attrs;
-    bool file_not_exists;
-    if (!client->stat(target_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-      if (!file_not_exists) {
+    if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
+      if (client->getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
         logger_->log_error("Failed to stat %s", target_path.c_str());
         session->transfer(flow_file, Failure);
         return true;
@@ -679,10 +354,9 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
           std::stringstream possible_resolved_filename_ss;
           possible_resolved_filename_ss << i << "." << filename;
           possible_resolved_filename = possible_resolved_filename_ss.str();
-          auto possible_resolved_path = remote_path + "/" + possible_resolved_filename;
-          bool file_not_exists;
-          if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-            if (file_not_exists) {
+          std::string possible_resolved_path = utils::file::FileUtils::concat_path(remote_path, possible_resolved_filename, true /*force_posix*/);
+          if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
+            if (client->getLastError() == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
               unique_name_generated = true;
               break;
             } else {
@@ -707,50 +381,26 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
 
   /* Create remote directory if needed */
   if (create_directory_) {
-    bool should_create_directory = disable_directory_listing;
-    if (!disable_directory_listing) {
-      LIBSSH2_SFTP_ATTRIBUTES attrs;
-      bool file_not_exists;
-      if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-        if (!file_not_exists) {
-          logger_->log_error("Failed to stat %s", remote_path.c_str());
-        }
-        should_create_directory = true;
-      } else {
-        if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
-          logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
-          session->transfer(flow_file, Failure);
-          put_connection_back_to_cache();
-          return true;
-        }
-        logger_->log_debug("Found remote directory %s", remote_path.c_str());
-      }
-    }
-    if (should_create_directory) {
-      (void) client->createDirectoryHierarchy(remote_path);
-      if (!disable_directory_listing) {
-        LIBSSH2_SFTP_ATTRIBUTES attrs;
-        bool file_not_exists;
-        if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-          if (file_not_exists) {
-            logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
-            session->transfer(flow_file, Failure);
-            put_connection_back_to_cache();
-            return true;
-          } else {
-            logger_->log_error("Failed to stat %s", remote_path.c_str());
-            context->yield();
-            return false;
-          }
-        } else {
-          if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
-            logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
-            session->transfer(flow_file, Failure);
-            put_connection_back_to_cache();
-            return true;
-          }
-        }
-      }
+    auto res = createDirectoryHierarchy(*client, remote_path, disable_directory_listing);
+    switch (res) {
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
+        break;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
+        context->yield();
+        return false;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
+        session->transfer(flow_file, Failure);
+        put_connection_back_to_cache();
+        return true;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
+        session->transfer(flow_file, Failure);
+        put_connection_back_to_cache();
+        return true;
+      default:
+        logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
+        context->yield();
+        return false;
     }
   }
 
@@ -765,15 +415,13 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
     target_path_ss << resolved_filename;
   }
   auto target_path = target_path_ss.str();
-  std::stringstream final_target_path_ss;
-  final_target_path_ss << remote_path << "/" << resolved_filename;
-  auto final_target_path = final_target_path_ss.str();
+  std::string final_target_path = utils::file::FileUtils::concat_path(remote_path, resolved_filename, true /*force_posix*/);
   logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
 
   ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
-  session->read(flow_file, &read_callback);
-
-  if (!read_callback.commit()) {
+  try {
+    session->read(flow_file, &read_callback);
+  } catch (const utils::SFTPError&) {
     session->transfer(flow_file, Failure);
     return true;
   }
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index d04fbf8..3d2a115 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -26,6 +26,7 @@
 #include <mutex>
 #include <thread>
 
+#include "SFTPProcessorBase.h"
 #include "utils/ByteArrayCallback.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -44,7 +45,7 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class PutSFTP : public core::Processor {
+ class PutSFTP : public SFTPProcessorBase {
  public:
 
   static constexpr char const *CONFLICT_RESOLUTION_REPLACE = "REPLACE";
@@ -54,10 +55,6 @@ class PutSFTP : public core::Processor {
   static constexpr char const *CONFLICT_RESOLUTION_FAIL = "FAIL";
   static constexpr char const *CONFLICT_RESOLUTION_NONE = "NONE";
 
-  static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
-  static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
-  static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
-
   static constexpr char const* ProcessorName = "PutSFTP";
 
 
@@ -68,35 +65,19 @@ class PutSFTP : public core::Processor {
   virtual ~PutSFTP();
 
   // Supported Properties
-  static core::Property Hostname;
-  static core::Property Port;
-  static core::Property Username;
-  static core::Property Password;
-  static core::Property PrivateKeyPath;
-  static core::Property PrivateKeyPassphrase;
   static core::Property RemotePath;
   static core::Property CreateDirectory;
   static core::Property DisableDirectoryListing;
   static core::Property BatchSize;
-  static core::Property ConnectionTimeout;
-  static core::Property DataTimeout;
   static core::Property ConflictResolution;
   static core::Property RejectZeroByte;
   static core::Property DotRename;
   static core::Property TempFilename;
-  static core::Property HostKeyFile;
   static core::Property LastModifiedTime;
   static core::Property Permissions;
   static core::Property RemoteOwner;
   static core::Property RemoteGroup;
-  static core::Property StrictHostKeyChecking;
-  static core::Property UseKeepaliveOnTimeout;
   static core::Property UseCompression;
-  static core::Property ProxyType;
-  static core::Property ProxyHost;
-  static core::Property ProxyPort;
-  static core::Property HttpProxyUsername;
-  static core::Property HttpProxyPassword;
 
   // Supported Relationships
   static core::Relationship Success;
@@ -110,7 +91,6 @@ class PutSFTP : public core::Processor {
   virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   virtual void initialize() override;
   virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  virtual void notifyStop() override;
 
   class ReadCallback : public InputStreamCallback {
    public:
@@ -119,7 +99,6 @@ class PutSFTP : public core::Processor {
         const std::string& conflict_resolution);
     ~ReadCallback();
     virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
-    bool commit();
 
    private:
     std::shared_ptr<logging::Logger> logger_;
@@ -131,43 +110,11 @@ class PutSFTP : public core::Processor {
 
  private:
 
-  std::shared_ptr<logging::Logger> logger_;
-
   bool create_directory_;
   uint64_t batch_size_;
-  int64_t connection_timeout_;
-  int64_t data_timeout_;
   std::string conflict_resolution_;
   bool reject_zero_byte_;
   bool dot_rename_;
-  std::string host_key_file_;
-  bool strict_host_checking_;
-  bool use_keepalive_on_timeout_;
-  bool use_compression_;
-  std::string proxy_type_;
-
-  static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
-  struct ConnectionCacheKey {
-    std::string hostname;
-    uint16_t port;
-    std::string username;
-    std::string proxy_type;
-    std::string proxy_host;
-    uint16_t proxy_port;
-
-    bool operator<(const ConnectionCacheKey& other) const;
-    bool operator==(const ConnectionCacheKey& other) const;
-  };
-  std::mutex connections_mutex_;
-  std::map<ConnectionCacheKey, std::unique_ptr<utils::SFTPClient>> connections_;
-  std::list<ConnectionCacheKey> lru_;
-  std::unique_ptr<utils::SFTPClient> getConnectionFromCache(const ConnectionCacheKey& key);
-  void addConnectionToCache(const ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection);
-
-  std::thread keepalive_thread_;
-  bool running_;
-  std::condition_variable keepalive_cv_;
-  void keepaliveThreadFunc();
 
   bool processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
 };
diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp
new file mode 100644
index 0000000..63c335a
--- /dev/null
+++ b/extensions/sftp/processors/SFTPProcessorBase.cpp
@@ -0,0 +1,475 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SFTPProcessorBase.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property SFTPProcessorBase::Hostname(
+    core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Port(
+    core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Username(
+    core::PropertyBuilder::createProperty("Username")->withDescription("Username")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Password(
+    core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPath(
+    core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPassphrase(
+    core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::StrictHostKeyChecking(
+    core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property SFTPProcessorBase::HostKeyFile(
+    core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+        ->isRequired(false)->build());
+core::Property SFTPProcessorBase::ConnectionTimeout(
+    core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::DataTimeout(
+    core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::SendKeepaliveOnTimeout(
+    core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property SFTPProcessorBase::ProxyType(
+    core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
+                                                                         "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
+        ->isRequired(false)
+        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
+                                            PROXY_TYPE_HTTP,
+                                            PROXY_TYPE_SOCKS})
+        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
+core::Property SFTPProcessorBase::ProxyHost(
+    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::ProxyPort(
+    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyUsername(
+    core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyPassword(
+    core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+
+constexpr size_t SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE;
+
+SFTPProcessorBase::SFTPProcessorBase(std::string name, utils::Identifier uuid)
+    : Processor(name, uuid),
+      connection_timeout_(0),
+      data_timeout_(0),
+      strict_host_checking_(false),
+      use_keepalive_on_timeout_(false),
+      use_compression_(false),
+      running_(true) {
+}
+
+SFTPProcessorBase::~SFTPProcessorBase() {
+  if (keepalive_thread_.joinable()) {
+    {
+      std::lock_guard<std::mutex> lock(connections_mutex_);
+      running_ = false;
+      keepalive_cv_.notify_one();
+    }
+    keepalive_thread_.join();
+  }
+}
+
+void SFTPProcessorBase::notifyStop() {
+  logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
+  cleanupConnectionCache();
+}
+
+void SFTPProcessorBase::addSupportedCommonProperties(std::set<core::Property>& supported_properties) {
+  supported_properties.insert(Hostname);
+  supported_properties.insert(Port);
+  supported_properties.insert(Username);
+  supported_properties.insert(Password);
+  supported_properties.insert(PrivateKeyPath);
+  supported_properties.insert(PrivateKeyPassphrase);
+  supported_properties.insert(StrictHostKeyChecking);
+  supported_properties.insert(HostKeyFile);
+  supported_properties.insert(ConnectionTimeout);
+  supported_properties.insert(DataTimeout);
+  supported_properties.insert(SendKeepaliveOnTimeout);
+  supported_properties.insert(ProxyType);
+  supported_properties.insert(ProxyHost);
+  supported_properties.insert(ProxyPort);
+  supported_properties.insert(HttpProxyUsername);
+  supported_properties.insert(HttpProxyPassword);
+}
+
+void SFTPProcessorBase::parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context) {
+  std::string value;
+  if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
+    logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, strict_host_checking_);
+  }
+  context->getProperty(HostKeyFile.getName(), host_key_file_);
+  if (!context->getProperty(ConnectionTimeout.getName(), value)) {
+    logger_->log_error("Connection Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
+      logger_->log_error("Connection Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(DataTimeout.getName(), value)) {
+    logger_->log_error("Data Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
+      logger_->log_error("Data Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(SendKeepaliveOnTimeout.getName(), value)) {
+    logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
+  }
+  context->getProperty(ProxyType.getName(), proxy_type_);
+}
+
+SFTPProcessorBase::CommonProperties::CommonProperties()
+    : port(0U)
+    , proxy_port(0U)
+{
+}
+
+bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties) {
+  std::string value;
+  if (!context->getProperty(Hostname, common_properties.hostname, flow_file)) {
+    logger_->log_error("Hostname attribute is missing");
+    return false;
+  }
+  if (!context->getProperty(Port, value, flow_file)) {
+    logger_->log_error("Port attribute is missing or invalid");
+    return false;
+  } else {
+    int port_tmp;
+    if (!core::Property::StringToInt(value, port_tmp) ||
+        port_tmp <= std::numeric_limits<uint16_t>::min() ||
+        port_tmp > std::numeric_limits<uint16_t>::max()) {
+      logger_->log_error("Port attribute \"%s\" is invalid", value);
+      return false;
+    } else {
+      common_properties.port = static_cast<uint16_t>(port_tmp);
+    }
+  }
+  if (!context->getProperty(Username, common_properties.username, flow_file)) {
+    logger_->log_error("Username attribute is missing");
+    return false;
+  }
+  context->getProperty(Password, common_properties.password, flow_file);
+  context->getProperty(PrivateKeyPath, common_properties.private_key_path, flow_file);
+  context->getProperty(PrivateKeyPassphrase, common_properties.private_key_passphrase, flow_file);
+  context->getProperty(Password, common_properties.password, flow_file);
+  context->getProperty(ProxyHost, common_properties.proxy_host, flow_file);
+  if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
+    int port_tmp;
+    if (!core::Property::StringToInt(value, port_tmp) ||
+        port_tmp <= std::numeric_limits<uint16_t>::min() ||
+        port_tmp > std::numeric_limits<uint16_t>::max()) {
+      logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
+      return false;
+    } else {
+      common_properties.proxy_port = static_cast<uint16_t>(port_tmp);
+    }
+  }
+  context->getProperty(HttpProxyUsername, common_properties.proxy_username, flow_file);
+  context->getProperty(HttpProxyPassword, common_properties.proxy_password, flow_file);
+
+  return true;
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator<(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) <
+         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator==(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) ==
+         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getConnectionFromCache(const SFTPProcessorBase::ConnectionCacheKey& key) {
+  std::lock_guard<std::mutex> lock(connections_mutex_);
+
+  auto it = connections_.find(key);
+  if (it == connections_.end()) {
+    return nullptr;
+  }
+
+  logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
+                     key.username,
+                     key.hostname,
+                     key.port);
+
+  auto lru_it = std::find(lru_.begin(), lru_.end(), key);
+  if (lru_it == lru_.end()) {
+    logger_->log_trace("Assertion error: can't find key in LRU cache");
+  } else {
+    lru_.erase(lru_it);
+  }
+
+  auto connection = std::move(it->second);
+  connections_.erase(it);
+  return connection;
+}
+
+void SFTPProcessorBase::addConnectionToCache(const SFTPProcessorBase::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
+  std::lock_guard<std::mutex> lock(connections_mutex_);
+
+  while (connections_.size() >= SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE) {
+    const auto& lru_key = lru_.back();
+    logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
+                       lru_key.username,
+                       lru_key.hostname,
+                       lru_key.port);
+    connections_.erase(lru_key);
+    lru_.pop_back();
+  }
+
+  logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
+                     key.username,
+                     key.hostname,
+                     key.port);
+  connections_.emplace(key, std::move(connection));
+  lru_.push_front(key);
+  keepalive_cv_.notify_one();
+}
+
+void SFTPProcessorBase::keepaliveThreadFunc() {
+  std::unique_lock<std::mutex> lock(connections_mutex_);
+
+  while (true) {
+    if (connections_.empty()) {
+      keepalive_cv_.wait(lock, [this] {
+        return !running_ || !connections_.empty();
+      });
+    }
+    if (!running_) {
+      logger_->log_trace("Stopping keepalive thread");
+      lock.unlock();
+      return;
+    }
+
+    int min_wait = 10;
+    for (auto &connection : connections_) {
+      int seconds_to_next = 0;
+      if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
+        logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
+                           connection.first.username,
+                           connection.first.hostname,
+                           connection.first.port,
+                           seconds_to_next);
+        if (seconds_to_next < min_wait) {
+          min_wait = seconds_to_next;
+        }
+      } else {
+        logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
+                           connection.first.username,
+                           connection.first.hostname,
+                           connection.first.port);
+      }
+    }
+
+    /* Avoid busy loops */
+    if (min_wait < 1) {
+      min_wait = 1;
+    }
+
+    logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
+    keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
+      return !running_;
+    });
+    if (!running_) {
+      lock.unlock();
+      return;
+    }
+  }
+}
+
+void SFTPProcessorBase::startKeepaliveThreadIfNeeded() {
+  if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
+    running_ = true;
+    keepalive_thread_ = std::thread(&SFTPProcessorBase::keepaliveThreadFunc, this);
+  }
+}
+
+void SFTPProcessorBase::cleanupConnectionCache() {
+  if (keepalive_thread_.joinable()) {
+    {
+      std::lock_guard<std::mutex> lock(connections_mutex_);
+      running_ = false;
+      keepalive_cv_.notify_one();
+    }
+    keepalive_thread_.join();
+  }
+  /* The thread is no longer running, we don't have to lock */
+  connections_.clear();
+  lru_.clear();
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getOrCreateConnection(
+    const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+    const std::string& password,
+    const std::string& private_key_path,
+    const std::string& private_key_passphrase,
+    const std::string& proxy_password) {
+  auto client = getConnectionFromCache(connection_cache_key);
+  if (client == nullptr) {
+    client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(connection_cache_key.hostname,
+                                                                      connection_cache_key.port,
+                                                                      connection_cache_key.username));
+    if (!IsNullOrEmpty(host_key_file_)) {
+      if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
+        logger_->log_error("Cannot set host key file");
+        return nullptr;
+      }
+    }
+    if (!IsNullOrEmpty(password)) {
+      client->setPasswordAuthenticationCredentials(password);
+    }
+    if (!IsNullOrEmpty(private_key_path)) {
+      client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
+    }
+    if (connection_cache_key.proxy_type != PROXY_TYPE_DIRECT) {
+      utils::HTTPProxy proxy;
+      proxy.host = connection_cache_key.proxy_host;
+      proxy.port = connection_cache_key.proxy_port;
+      proxy.username = connection_cache_key.proxy_username;
+      proxy.password = proxy_password;
+      if (!client->setProxy(
+          connection_cache_key.proxy_type == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
+          proxy)) {
+        logger_->log_error("Cannot set proxy");
+        return nullptr;
+      }
+    }
+    if (!client->setConnectionTimeout(connection_timeout_)) {
+      logger_->log_error("Cannot set connection timeout");
+      return nullptr;
+    }
+    client->setDataTimeout(data_timeout_);
+    client->setSendKeepAlive(use_keepalive_on_timeout_);
+    if (!client->setUseCompression(use_compression_)) {
+      logger_->log_error("Cannot set compression");
+      return nullptr;
+    }
+
+    /* Connect to SFTP server */
+    if (!client->connect()) {
+      logger_->log_error("Cannot connect to SFTP server");
+      return nullptr;
+    }
+  }
+
+  return client;
+}
+
+SFTPProcessorBase::CreateDirectoryHierarchyError SFTPProcessorBase::createDirectoryHierarchy(
+    utils::SFTPClient& client,
+    const std::string& remote_path,
+    bool disable_directory_listing) {
+  bool should_create_directory = disable_directory_listing;
+  if (!disable_directory_listing) {
+    LIBSSH2_SFTP_ATTRIBUTES attrs;
+    if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+      if (client.getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+        logger_->log_error("Failed to stat %s", remote_path.c_str());
+      }
+      should_create_directory = true;
+    } else {
+      if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+        logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+        return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+      }
+      logger_->log_debug("Found remote directory %s", remote_path.c_str());
+    }
+  }
+  if (should_create_directory) {
+    (void) client.createDirectoryHierarchy(remote_path);
+    if (!disable_directory_listing) {
+      LIBSSH2_SFTP_ATTRIBUTES attrs;
+      if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+        auto last_error = client.getLastError();
+        if (last_error == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+          logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND;
+        } else if (last_error == utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED) {
+          logger_->log_error("Permission denied when reading remote directory %s after creating it", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED;
+        } else {
+          logger_->log_error("Failed to stat %s", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED;
+        }
+      } else {
+        if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+          logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+        }
+      }
+    }
+  }
+
+  return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/SFTPProcessorBase.h
similarity index 56%
copy from extensions/sftp/processors/PutSFTP.h
copy to extensions/sftp/processors/SFTPProcessorBase.h
index d04fbf8..4d83fff 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/SFTPProcessorBase.h
@@ -16,8 +16,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __PUT_SFTP_H__
-#define __PUT_SFTP_H__
+#ifndef __SFTP_PROCESSOR_BASE_H__
+#define __SFTP_PROCESSOR_BASE_H__
 
 #include <memory>
 #include <string>
@@ -44,28 +44,10 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class PutSFTP : public core::Processor {
+class SFTPProcessorBase : public core::Processor {
  public:
-
-  static constexpr char const *CONFLICT_RESOLUTION_REPLACE = "REPLACE";
-  static constexpr char const *CONFLICT_RESOLUTION_IGNORE = "IGNORE";
-  static constexpr char const *CONFLICT_RESOLUTION_RENAME = "RENAME";
-  static constexpr char const *CONFLICT_RESOLUTION_REJECT = "REJECT";
-  static constexpr char const *CONFLICT_RESOLUTION_FAIL = "FAIL";
-  static constexpr char const *CONFLICT_RESOLUTION_NONE = "NONE";
-
-  static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
-  static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
-  static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
-
-  static constexpr char const* ProcessorName = "PutSFTP";
-
-
-  /*!
-   * Create a new processor
-   */
-  PutSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
-  virtual ~PutSFTP();
+  SFTPProcessorBase(std::string name, utils::Identifier uuid);
+  virtual ~SFTPProcessorBase();
 
   // Supported Properties
   static core::Property Hostname;
@@ -74,78 +56,54 @@ class PutSFTP : public core::Processor {
   static core::Property Password;
   static core::Property PrivateKeyPath;
   static core::Property PrivateKeyPassphrase;
-  static core::Property RemotePath;
-  static core::Property CreateDirectory;
-  static core::Property DisableDirectoryListing;
-  static core::Property BatchSize;
+  static core::Property StrictHostKeyChecking;
+  static core::Property HostKeyFile;
   static core::Property ConnectionTimeout;
   static core::Property DataTimeout;
-  static core::Property ConflictResolution;
-  static core::Property RejectZeroByte;
-  static core::Property DotRename;
-  static core::Property TempFilename;
-  static core::Property HostKeyFile;
-  static core::Property LastModifiedTime;
-  static core::Property Permissions;
-  static core::Property RemoteOwner;
-  static core::Property RemoteGroup;
-  static core::Property StrictHostKeyChecking;
-  static core::Property UseKeepaliveOnTimeout;
-  static core::Property UseCompression;
+  static core::Property SendKeepaliveOnTimeout;
+  static core::Property TargetSystemTimestampPrecision;
   static core::Property ProxyType;
   static core::Property ProxyHost;
   static core::Property ProxyPort;
   static core::Property HttpProxyUsername;
   static core::Property HttpProxyPassword;
 
-  // Supported Relationships
-  static core::Relationship Success;
-  static core::Relationship Reject;
-  static core::Relationship Failure;
-
-  virtual bool supportsDynamicProperties() override {
-    return true;
-  }
+  static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
+  static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
+  static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
 
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
-  virtual void initialize() override;
-  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   virtual void notifyStop() override;
 
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(const std::string& target_path,
-        utils::SFTPClient& client,
-        const std::string& conflict_resolution);
-    ~ReadCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
-    bool commit();
-
-   private:
-    std::shared_ptr<logging::Logger> logger_;
-    bool write_succeeded_;
-    const std::string target_path_;
-    utils::SFTPClient& client_;
-    const std::string conflict_resolution_;
-  };
-
- private:
-
+ protected:
   std::shared_ptr<logging::Logger> logger_;
 
-  bool create_directory_;
-  uint64_t batch_size_;
   int64_t connection_timeout_;
   int64_t data_timeout_;
-  std::string conflict_resolution_;
-  bool reject_zero_byte_;
-  bool dot_rename_;
   std::string host_key_file_;
   bool strict_host_checking_;
   bool use_keepalive_on_timeout_;
   bool use_compression_;
   std::string proxy_type_;
 
+  void addSupportedCommonProperties(std::set<core::Property>& supported_properties);
+  void parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context);
+  struct CommonProperties {
+    std::string hostname;
+    uint16_t port;
+    std::string username;
+    std::string password;
+    std::string private_key_path;
+    std::string private_key_passphrase;
+    std::string remote_path;
+    std::string proxy_host;
+    uint16_t proxy_port;
+    std::string proxy_username;
+    std::string proxy_password;
+
+    CommonProperties();
+  };
+  bool parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties);
+
   static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
   struct ConnectionCacheKey {
     std::string hostname;
@@ -154,6 +112,7 @@ class PutSFTP : public core::Processor {
     std::string proxy_type;
     std::string proxy_host;
     uint16_t proxy_port;
+    std::string proxy_username;
 
     bool operator<(const ConnectionCacheKey& other) const;
     bool operator==(const ConnectionCacheKey& other) const;
@@ -169,11 +128,25 @@ class PutSFTP : public core::Processor {
   std::condition_variable keepalive_cv_;
   void keepaliveThreadFunc();
 
-  bool processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void startKeepaliveThreadIfNeeded();
+  void cleanupConnectionCache();
+  std::unique_ptr<utils::SFTPClient> getOrCreateConnection(
+      const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+      const std::string& password,
+      const std::string& private_key_path,
+      const std::string& private_key_passphrase,
+      const std::string& proxy_password);
+
+  enum class CreateDirectoryHierarchyError : uint8_t {
+    CREATE_DIRECTORY_HIERARCHY_ERROR_OK = 0,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED,
+  };
+  CreateDirectoryHierarchyError createDirectoryHierarchy(utils::SFTPClient& client, const std::string& remote_path, bool disable_directory_listing);
 };
 
-REGISTER_RESOURCE(PutSFTP, "Sends FlowFiles to an SFTP Server")
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/sftp/tests/CMakeLists.txt b/extensions/sftp/tests/CMakeLists.txt
index 998d1ed..f8663f7 100644
--- a/extensions/sftp/tests/CMakeLists.txt
+++ b/extensions/sftp/tests/CMakeLists.txt
@@ -61,4 +61,4 @@ if (NOT SKIP_TESTS AND Java_FOUND AND Maven_FOUND)
 	add_subdirectory(tools)
 else()
 	message("Could find Java and Maven to build SFTPTestServer, disabling SFTP tests")
-endif()
\ No newline at end of file
+endif()
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
new file mode 100644
index 0000000..d7e95c7
--- /dev/null
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -0,0 +1,425 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class FetchSFTPTestsFixture {
+ public:
+  FetchSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+    LogTestController::getInstance().setTrace<processors::PutFile>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+    testController.createTempDirectory(dst_dir);
+    REQUIRE(dst_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    plan = testController.createPlan();
+    generate_flow_file = plan->addProcessor(
+        "GenerateFlowFile",
+        "GenerateFlowFile");
+    update_attribute = plan->addProcessor("UpdateAttribute",
+         "UpdateAttribute",
+         core::Relationship("success", "d"),
+         true);
+    fetch_sftp = plan->addProcessor(
+        "FetchSFTP",
+        "FetchSFTP",
+        core::Relationship("success", "d"),
+        true);
+    plan->addProcessor("LogAttribute",
+        "LogAttribute",
+        { core::Relationship("success", "d"),
+          core::Relationship("comms.failure", "d"),
+          core::Relationship("not.found", "d"),
+          core::Relationship("permission.denied", "d") },
+          true);
+    put_file = plan->addProcessor("PutFile",
+         "PutFile",
+         core::Relationship("success", "d"),
+         true);
+
+    // Configure GenerateFlowFile processor
+    plan->setProperty(generate_flow_file, "File Size", "1B");
+
+    // Configure FetchSFTP processor
+    plan->setProperty(fetch_sftp, "Hostname", "localhost");
+    plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(fetch_sftp, "Username", "nifiuser");
+    plan->setProperty(fetch_sftp, "Password", "nifipassword");
+    plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+    plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(fetch_sftp, "Use Compression", "false");
+
+    // Configure PutFile processor
+    plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+    plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+    plan->setProperty(put_file, "Create Missing Directories", "true");
+  }
+
+  virtual ~FetchSFTPTestsFixture() {
+    free(src_dir);
+    free(dst_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    utils::file::FileUtils::create_dir(utils::file::FileUtils::get_parent_path(ss.str())); // TODO
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+  }
+
+  enum TestWhere {
+    IN_DESTINATION,
+    IN_SOURCE
+  };
+
+  void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(true == file.good());
+    std::stringstream content;
+    std::vector<char> buffer(1024U);
+    while (file) {
+      file.read(buffer.data(), buffer.size());
+      content << std::string(buffer.data(), file.gcount());
+    }
+    REQUIRE(expected_content == content.str());
+  }
+
+  void testFileNotExists(TestWhere where, const std::string& relative_path) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(false == file.is_open());
+    REQUIRE(false == file.good());
+  }
+
+ protected:
+  char *src_dir;
+  char *dst_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> generate_flow_file;
+  std::shared_ptr<core::Processor> update_attribute;
+  std::shared_ptr<core::Processor> fetch_sftp;
+  std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch one file", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP public key authentication", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+  plan->setProperty(fetch_sftp, "Private Key Passphrase", "privatekeypassword");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-existing file", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship not.found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-readable file", "[FetchSFTP][basic]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test/tstFile.ext").c_str(), 0000));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship permission.denied"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch connection error", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  /* Run it once normally to open the connection */
+  testController.runSession(plan, true);
+  plan->reset();
+
+  /* Stop the server to create a connection error */
+  sftp_server.reset();
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\" due to an underlying SSH error"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship comms.failure"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File success", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File fail", "[FetchSFTP][completion-strategy]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+  /* By making the parent directory non-writable we make it impossible do delete the source file */
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0500));
+
+  testController.runSession(plan, true);
+
+  /* We should succeed even if the completion strategy fails */
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to remove remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Delete File, but failed to delete remote file \"nifi_test/tstFile.ext\""));
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File success", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+  plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File fail", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+
+  /* The completion strategy should fail because the target directory does not exist and we don't create it */
+  plan->setProperty(fetch_sftp, "Create Directory", "false");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  /* We should succeed even if the completion strategy fails */
+  testFileNotExists(IN_SOURCE, "nifi_done/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to rename remote file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Move File, but failed to move file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\""));
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP expression language test", "[FetchSFTP]") {
+  plan->setProperty(update_attribute, "attr_Hostname", "localhost", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Port", std::to_string(sftp_server->getPort()), true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Username", "nifiuser", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Password", "nifipassword", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Private Key Path",
+  utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"), true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Private Key Passphrase", "privatekeypassword", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Remote File", "nifi_test/tstFile.ext", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Move Destination Directory", "nifi_done/", true /*dynamic*/);
+
+  plan->setProperty(fetch_sftp, "Hostname", "${'attr_Hostname'}");
+  plan->setProperty(fetch_sftp, "Port", "${'attr_Port'}");
+  plan->setProperty(fetch_sftp, "Username", "${'attr_Username'}");
+  plan->setProperty(fetch_sftp, "Password", "${'attr_Password'}");
+  plan->setProperty(fetch_sftp, "Private Key Path", "${'attr_Private Key Path'}");
+  plan->setProperty(fetch_sftp, "Private Key Passphrase", "${'attr_Private Key Passphrase'}");
+  plan->setProperty(fetch_sftp, "Remote File", "${'attr_Remote File'}");
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "${'attr_Move Destination Directory'}");
+
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
new file mode 100644
index 0000000..f82b010
--- /dev/null
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -0,0 +1,907 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "tools/SFTPTestServer.h"
+
+class ListSFTPTestsFixture {
+ public:
+  ListSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::ListSFTP>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    createPlan();
+  }
+
+  virtual ~ListSFTPTestsFixture() {
+    free(src_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
+    plan = testController.createPlan();
+    if (list_sftp_uuid == nullptr) {
+      list_sftp = plan->addProcessor(
+          "ListSFTP",
+          "ListSFTP");
+    } else {
+      list_sftp = plan->addProcessor(
+          "ListSFTP",
+          *list_sftp_uuid,
+          "ListSFTP",
+          {core::Relationship("success", "d")});
+    }
+    log_attribute = plan->addProcessor("LogAttribute",
+                                       "LogAttribute",
+                                       core::Relationship("success", "d"),
+                                       true);
+
+    // Configure ListSFTP processor
+    plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+    plan->setProperty(list_sftp, "Hostname", "localhost");
+    plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(list_sftp, "Username", "nifiuser");
+    plan->setProperty(list_sftp, "Password", "nifipassword");
+    plan->setProperty(list_sftp, "Search Recursively", "false");
+    plan->setProperty(list_sftp, "Follow symlink", "false");
+    plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+    plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+    plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+    plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+    plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+    plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+    // Configure LogAttribute processor
+    plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    auto full_path = ss.str();
+    std::deque<std::string> parent_dirs;
+    std::string parent_dir = full_path;
+    while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+      parent_dirs.push_front(parent_dir);
+    }
+    for (const auto& dir : parent_dirs) {
+      utils::file::FileUtils::create_dir(dir);
+    }
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+    if (modification_timestamp != 0U) {
+      REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+    }
+  }
+
+  void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+    time_t now = time(nullptr);
+    return createFile(relative_path, content, now + modification_timediff);
+  }
+
+ protected:
+  char *src_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> list_sftp;
+  std::shared_ptr<core::Processor> log_attribute;
+};
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP public key authentication", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(list_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+  plan->setProperty(list_sftp, "Private Key Passphrase", "privatekeypassword");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-existing dir", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Remote Path", "nifi_test2/");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test2\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-readable dir", "[ListSFTP][basic]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0000));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/tstFile.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+  std::string mtime_str;
+  REQUIRE(true == getDateTimeStr(mtime, mtime_str));
+  uint64_t uid, gid;
+  REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
+  uint32_t permissions;
+  REQUIRE(true == utils::file::FileUtils::get_permissions(file, permissions));
+  std::stringstream permissions_ss;
+  permissions_ss << std::setfill('0') << std::setw(4) << std::oct << permissions;
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.listing.user value:nifiuser"));
+  REQUIRE(LogTestController::getInstance().contains("key:file.owner value:" + std::to_string(uid)));
+  REQUIRE(LogTestController::getInstance().contains("key:file.group value:" + std::to_string(gid)));
+  REQUIRE(LogTestController::getInstance().contains("key:file.permissions value:" + permissions_ss.str()));
+  REQUIRE(LogTestController::getInstance().contains("key:file.size value:14"));
+  REQUIRE(LogTestController::getInstance().contains("key:file.lastModifiedTime value:" + mtime_str));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir no recursion", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir with recursion", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Age too young", "[ListSFTP][file-age]") {
+  plan->setProperty(list_sftp, "Minimum File Age", "2 hours");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is younger than the Minimum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Age too old", "[ListSFTP][file-age]") {
+  plan->setProperty(list_sftp, "Maximum File Age", "1 min");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is older than the Maximum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Size too small", "[ListSFTP][file-size]") {
+  plan->setProperty(list_sftp, "Minimum File Size", "1 MB");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is smaller than the Minimum File Size: 14 B < 1048576 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Size too large", "[ListSFTP][file-size]") {
+  plan->setProperty(list_sftp, "Maximum File Size", "4 B");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is larger than the Maximum File Size: 14 B > 4 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP File Filter Regex", "[ListSFTP][file-filter-regex]") {
+  plan->setProperty(list_sftp, "File Filter Regex", "^.*2.*$");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/file1.ext\" because it did not match the File Filter Regex \"^.*2.*$\""));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Path Filter Regex", "[ListSFTP][path-filter-regex]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+  plan->setProperty(list_sftp, "Path Filter Regex", "^.*foobar.*$");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/foobar/file2.ext", "Test content 2");
+  createFileWithModificationTimeDiff("nifi_test/notbar/file3.ext", "Test with longer content 3");
+
+  testController.runSession(plan, true);
+
+  /* file1.ext is in the root */
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  /* file2.ext is in a matching subdirectory */
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+  /* file3.ext is in a non-matching subdirectory */
+  REQUIRE(LogTestController::getInstance().contains("Not recursing into \"nifi_test/notbar\" because it did not match the Path Filter Regex \"^.*foobar.*$\""));
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file3.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false file symlink", "[ListSFTP][follow-symlink]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+  REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/file2.ext\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true file symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Follow symlink", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+  REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false directory symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+  auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+  auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+  REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/dir2\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true directory symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+  plan->setProperty(list_sftp, "Follow symlink", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+  auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+  auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+  REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir1"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir2"));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one new file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one older file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\", because it is not new."));
+  REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file another file with the same timestamp", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  /* We must sleep to avoid triggering the listing lag. */
+  std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+  createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file timestamp updated", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  /* We must sleep to avoid triggering the listing lag. */
+  std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("and all files for that timestamp has been processed. Yielding."));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Timestamps state file"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+                                                    "Hostname: \"localhost\" vs. \"localhost\", "
+                                                    "Username: \"nifiuser\" vs. \"nifiuser\", "
+                                                    "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed configuration", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/tstFile.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one new file", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file in tracking window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file outside tracking window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6 * 3600 /* 6 hours ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\" because it has an older timestamp than the minimum timestamp to list"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file another file with the same timestamp", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file timestamp updated", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with newer timestamp"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size changes", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with different size: 14 -> 21"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Entities state file"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed configuration", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity outside window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+  plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1", -20*60 /* 20 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file1.ext\" because it has an older timestamp than the minimum timestamp to list"));
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity inside window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+  plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+}
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
new file mode 100644
index 0000000..0282cb9
--- /dev/null
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class ListThenFetchSFTPTestsFixture {
+ public:
+  ListThenFetchSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::ListSFTP>();
+    LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+    LogTestController::getInstance().setTrace<processors::PutFile>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+    testController.createTempDirectory(dst_dir);
+    REQUIRE(dst_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    plan = testController.createPlan();
+    list_sftp = plan->addProcessor(
+        "ListSFTP",
+        "ListSFTP");
+    fetch_sftp = plan->addProcessor(
+        "FetchSFTP",
+        "FetchSFTP",
+        core::Relationship("success", "d"),
+        true);
+    log_attribute = plan->addProcessor("LogAttribute",
+        "LogAttribute",
+        { core::Relationship("success", "d"),
+          core::Relationship("comms.failure", "d"),
+          core::Relationship("not.found", "d"),
+          core::Relationship("permission.denied", "d") },
+          true);
+    put_file = plan->addProcessor("PutFile",
+         "PutFile",
+         core::Relationship("success", "d"),
+         true);
+
+    // Configure ListSFTP processor
+    plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+    plan->setProperty(list_sftp, "Hostname", "localhost");
+    plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(list_sftp, "Username", "nifiuser");
+    plan->setProperty(list_sftp, "Password", "nifipassword");
+    plan->setProperty(list_sftp, "Search Recursively", "false");
+    plan->setProperty(list_sftp, "Follow symlink", "false");
+    plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+    plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+    plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+    plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+    plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+    plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+    // Configure FetchSFTP processor
+    plan->setProperty(fetch_sftp, "Hostname", "localhost");
+    plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(fetch_sftp, "Username", "nifiuser");
+    plan->setProperty(fetch_sftp, "Password", "nifipassword");
+    plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+    plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(fetch_sftp, "Use Compression", "false");
+    plan->setProperty(fetch_sftp, "Remote File", "${path}/${filename}");
+
+    // Configure LogAttribute processor
+    plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+
+    // Configure PutFile processor
+    plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+    plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+    plan->setProperty(put_file, "Create Missing Directories", "true");
+  }
+
+  virtual ~ListThenFetchSFTPTestsFixture() {
+    free(src_dir);
+    free(dst_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    auto full_path = ss.str();
+    std::deque<std::string> parent_dirs;
+    std::string parent_dir = full_path;
+    while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+      parent_dirs.push_front(parent_dir);
+    }
+    for (const auto& dir : parent_dirs) {
+      utils::file::FileUtils::create_dir(dir);
+    }
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+    if (modification_timestamp != 0U) {
+      REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+    }
+  }
+
+  void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+    time_t now = time(nullptr);
+    return createFile(relative_path, content, now + modification_timediff);
+  }
+
+  enum TestWhere {
+    IN_DESTINATION,
+    IN_SOURCE
+  };
+
+  void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(true == file.good());
+    std::stringstream content;
+    std::vector<char> buffer(1024U);
+    while (file) {
+      file.read(buffer.data(), buffer.size());
+      content << std::string(buffer.data(), file.gcount());
+    }
+    REQUIRE(expected_content == content.str());
+  }
+
+  void testFileNotExists(TestWhere where, const std::string& relative_path) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(false == file.is_open());
+    REQUIRE(false == file.good());
+  }
+
+ protected:
+  char *src_dir;
+  char *dst_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> list_sftp;
+  std::shared_ptr<core::Processor> fetch_sftp;
+  std::shared_ptr<core::Processor> log_attribute;
+  std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP one file", "[ListThenFetchSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+}
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP two files", "[ListThenFetchSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2");
+
+  /* ListSFTP */
+  plan->runNextProcessor();
+
+  /* FetchSFTP */
+  plan->runNextProcessor();
+  plan->runCurrentProcessor();
+
+  /* LogAttribute */
+  plan->runNextProcessor();
+
+  /* PutFile */
+  plan->runNextProcessor();
+  plan->runCurrentProcessor();
+
+  testFile(IN_SOURCE, "nifi_test/file1.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/file1.ext", "Test content 1");
+  testFile(IN_SOURCE, "nifi_test/file2.ext", "Test content 2");
+  testFile(IN_DESTINATION, "nifi_test/file2.ext", "Test content 2");
+}
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index c59a7e6..b9189df 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -34,6 +34,9 @@
 #include <functional>
 #include <iterator>
 #include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
 
 #include "TestBase.h"
 #include "utils/StringUtils.h"
@@ -523,7 +526,10 @@ TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set permissions", "[PutSFTP]") {
 
 #ifndef WIN32
 TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set uid and gid", "[PutSFTP]") {
-  std::cerr << "!!!! This test ONLY works as root, because it needs to chown !!!!" << std::endl;
+  if (getuid() != 0) {
+    std::cerr << "!!!! This test ONLY works as root, because it needs to chown. Exiting. !!!!" << std::endl;
+    return;
+  }
   plan->setProperty(put, "Remote Owner", "1234");
   plan->setProperty(put, "Remote Group", "4567");
 
diff --git a/extensions/sftp/tests/tools/SFTPTestServer.cpp b/extensions/sftp/tests/tools/SFTPTestServer.cpp
index 90efe2f..681b88f 100644
--- a/extensions/sftp/tests/tools/SFTPTestServer.cpp
+++ b/extensions/sftp/tests/tools/SFTPTestServer.cpp
@@ -78,16 +78,11 @@ bool SFTPTestServer::start() {
   /* fork */
   pid_t pid = fork();
   if (pid == 0) {
-    /* Set the child process's pgid to its pid, so we will be able to kill the entire process group */
-    if (setpgid(0, 0) != 0) {
-      std::cerr << "Failed to set PGID, errno: " << strerror(errno) << std::endl;
-      exit(-1);
-    }
     /* execv */
     std::vector<char*> args(4U);
     args[0] = strdup("/bin/sh");
     args[1] = strdup("-c");
-    args[2] = strdup(("java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " &> " + server_log_file_path).c_str());
+    args[2] = strdup(("exec java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " >" + server_log_file_path + " 2>&1").c_str());
     args[3] = nullptr;
     execv("/bin/sh", args.data());
     std::cerr << "Failed to start server, errno: " << strerror(errno) << std::endl;
@@ -127,7 +122,7 @@ bool SFTPTestServer::stop() {
   throw std::runtime_error("Not implemented");
 #else
   if (server_pid_ != -1) {
-    if (::kill(-server_pid_, SIGTERM) != 0) {
+    if (::kill(server_pid_, SIGTERM) != 0) {
       logger_->log_error("Failed to kill child process, error: %s", strerror(errno));
       return false;
     }
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 4f80689..2553c09 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -359,7 +359,7 @@ class Property {
   }
 
   static bool StringToDateTime(const std::string& input, int64_t& output) {
-    int64_t temp = pareDateTimeStr(input);
+    int64_t temp = parseDateTimeStr(input);
     if (temp == -1) {
       return false;
     }
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index a20059a..437bd71 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -23,6 +23,8 @@
 #include <iomanip>
 #include <sstream>
 #include <chrono>
+#include <array>
+#include <limits>
 
 #define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
 
@@ -67,7 +69,7 @@ inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) {
  * @param str the datetime string
  * @returns Unix timestamp
  */
-inline int64_t pareDateTimeStr(const std::string &str) {
+inline int64_t parseDateTimeStr(const std::string &str) {
   /**
    * There is no strptime on Windows. As long as we have to parse a single date format this is not so bad,
    * but if multiple formats will have to be supported in the future, it might be worth it to include
@@ -123,4 +125,22 @@ inline int64_t pareDateTimeStr(const std::string &str) {
   return time + timezone_offset;
 }
 
+inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
+  if (unix_timestamp > (std::numeric_limits<time_t>::max)() || unix_timestamp < (std::numeric_limits<time_t>::lowest)()) {
+    return false;
+  }
+  time_t time = static_cast<time_t>(unix_timestamp);
+  struct tm* gmt = gmtime(&time);
+  if (gmt == nullptr) {
+    return false;
+  }
+  std::array<char, 64U> buf;
+  if (strftime(buf.data(), buf.size(), "%Y-%m-%dT%H:%M:%SZ", gmt) == 0U) {
+    return false;
+  }
+
+  date_time_str = buf.data();
+  return true;
+}
+
 #endif
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 5a57f91..0b000ed 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -33,6 +33,7 @@
 #else
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <utime.h>
 #include <dirent.h>
 #include <errno.h>
 #endif
@@ -50,6 +51,8 @@
 #include <tchar.h> // _tcscpy,_tcscat,_tcscmp
 #include <string> // string
 #include <algorithm> // replace
+#include <sys/types.h>
+#include <sys/utime.h> // _utime64
 #endif
 #ifdef __APPLE__
 #include <mach-o/dyld.h>
@@ -79,16 +82,19 @@ class FileUtils {
 
   FileUtils() = delete;
 
-  /**
-   * Deletes a directory, deleting recursively if delete_files_recursively is true
-   * @param path current path to delete
-   * @param delete_files_recursively deletes recursively
+  /*
+   * Get the platform-specific path separator.
+   * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+   * @return the path separator character
    */
-
-  static char get_separator()
+  static char get_separator(bool force_posix = false)
   {
 #ifdef WIN32
-    return '\\';
+    if (force_posix) {
+      return '/';
+    } else {
+      return '\\';
+    }
 #else
     return '/';
 #endif
@@ -200,6 +206,20 @@ class FileUtils {
     return 0;
   }
 
+  static bool set_last_write_time(const std::string &path, uint64_t write_time) {
+#ifdef WIN32
+    struct __utimbuf64 utim;
+    utim.actime = write_time;
+    utim.modtime = write_time;
+    return _utime64(path.c_str(), &utim) == 0U;
+#else
+    struct utimbuf utim;
+    utim.actime = write_time;
+    utim.modtime = write_time;
+    return utime(path.c_str(), &utim) == 0U;
+#endif
+  }
+
 #ifndef WIN32
   static bool get_permissions(const std::string &path, uint32_t &permissions) {
     struct stat result;
@@ -416,76 +436,99 @@ class FileUtils {
 #endif
   }
 
-  static std::string concat_path(const std::string& root, const std::string& child) {
+  static std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
     if (root.empty()) {
       return child;
     }
     std::stringstream new_path;
-    if (root.back() == get_separator()) {
+    if (root.back() == get_separator(force_posix)) {
       new_path << root << child;
     } else {
-      new_path << root << get_separator() << child;
+      new_path << root << get_separator(force_posix) << child;
     }
     return new_path.str();
   }
 
-  static std::string get_parent_path(const std::string& path) {
+  static std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
     if (path.empty()) {
-      /* Empty path has no parent */
-      return "";
+      /* Empty path has no parent and no child*/
+      return std::make_tuple("", "");
     }
     bool absolute = false;
     size_t root_pos = 0U;
 #ifdef WIN32
-    if (path[0] == '\\') {
-      absolute = true;
-      if (path.size() < 2U) {
-        return "";
-      }
-      if (path[1] == '\\') {
-        if (path.size() >= 4U &&
-           (path[2] == '?' || path[2] == '.') &&
-            path[3] == '\\') {
-          /* Probably an UNC path */
-          root_pos = 4U;
-        } else {
-          /* Probably a \\server\-type path */
-          root_pos = 2U;
+    if (!force_posix) {
+      if (path[0] == '\\') {
+        absolute = true;
+        if (path.size() < 2U) {
+          return std::make_tuple("", "");
         }
-        root_pos = path.find_first_of("\\", root_pos);
-        if (root_pos == std::string::npos) {
-          return "";
+        if (path[1] == '\\') {
+          if (path.size() >= 4U &&
+             (path[2] == '?' || path[2] == '.') &&
+              path[3] == '\\') {
+            /* Probably an UNC path */
+            root_pos = 4U;
+          } else {
+            /* Probably a \\server\-type path */
+            root_pos = 2U;
+          }
+          root_pos = path.find_first_of("\\", root_pos);
+          if (root_pos == std::string::npos) {
+            return std::make_tuple("", "");
+          }
         }
+      } else if (path.size() >= 3U &&
+                 toupper(path[0]) >= 'A' &&
+                 toupper(path[0]) <= 'Z' &&
+                 path[1] == ':' &&
+                 path[2] == '\\') {
+        absolute = true;
+        root_pos = 2U;
       }
-    } else if (path.size() >= 3U &&
-               toupper(path[0]) >= 'A' &&
-               toupper(path[0]) <= 'Z' &&
-               path[1] == ':' &&
-               path[2] == '\\') {
-      absolute = true;
-      root_pos = 2U;
-    }
+    } else {
 #else
-    if (path[0] == '/') {
-      absolute = true;
-      root_pos = 0U;
-    }
+    if (true) {
 #endif
+      if (path[0] == '/') {
+        absolute = true;
+        root_pos = 0U;
+      }
+    }
+    /* Maybe we are just a single relative child */
+    if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
+      return std::make_tuple("", path);
+    }
     /* Ignore trailing separators */
     size_t last_pos = path.size() - 1;
-    while (last_pos > root_pos && path[last_pos] == get_separator()) {
+    while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
       last_pos--;
     }
     if (absolute && last_pos == root_pos) {
       /* This means we are only a root */
-      return "";
+      return std::make_tuple("", "");
     }
-    /* Find parent */
-    size_t last_separator = path.find_last_of(get_separator(), last_pos);
+    /* Find parent-child separator */
+    size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
     if (last_separator == std::string::npos || last_separator < root_pos) {
-      return "";
+      return std::make_tuple("", "");
     }
-    return path.substr(0, last_separator + 1);
+    std::string parent = path.substr(0, last_separator + 1);
+    std::string child = path.substr(last_separator + 1);
+
+    return std::make_tuple(std::move(parent), std::move(child));
+  }
+
+  static std::string get_parent_path(const std::string& path, bool force_posix = false) {
+    std::string parent_path;
+    std::tie(parent_path, std::ignore) = split_path(path, force_posix);
+    return parent_path;
+  }
+
+  static std::string get_child_path(const std::string& path, bool force_posix = false) {
+    std::string child_path;
+    std::tie(std::ignore, child_path) = split_path(path, force_posix);
+    return child_path;
   }
 
   /*
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 73dc77a..2ef6f0b 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -70,7 +70,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -102,7 +102,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.addValue(value);
     properties_[new_property.getName()] = new_property;
@@ -125,7 +125,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -153,7 +153,7 @@ bool ConfigurableComponent::setProperty(Property &prop, PropertyValue &value) {
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -230,7 +230,7 @@ bool ConfigurableComponent::setDynamicProperty(const std::string name, std::stri
   auto &&it = dynamic_properties_.find(name);
 
   if (it != dynamic_properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     new_property.setSupportsExpressionLanguage(true);
@@ -248,7 +248,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const
   auto &&it = dynamic_properties_.find(name);
 
   if (it != dynamic_properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.addValue(value);
     new_property.setSupportsExpressionLanguage(true);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 43eead4..f496b3f 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -96,18 +96,12 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
   return processor;
 }
 
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
   std::lock_guard<std::recursive_mutex> guard(mutex);
 
-  utils::Identifier uuid;
-
-  utils::IdGenerator::getIdGenerator()->generate(uuid);
-
-  std::cout << "generated " << uuid.to_string() << std::endl;
-
   auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
   if (nullptr == ptr) {
     throw std::exception();
@@ -119,6 +113,21 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
   return addProcessor(processor, name, relationships, linkToPrevious);
 }
 
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  utils::Identifier uuid;
+
+  utils::IdGenerator::getIdGenerator()->generate(uuid);
+
+  std::cout << "generated " << uuid.to_string() << std::endl;
+
+  return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
+}
+
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   int32_t i = 0;
@@ -169,6 +178,31 @@ bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::P
   }
   std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
   process_sessions_.push_back(current_session);
+  current_flowfile_ = nullptr;
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context, current_session);
+  } else {
+    logger_->log_info("Running %s", processor->getName());
+    processor->onTrigger(context, current_session);
+  }
+  current_session->commit();
+  return location + 1 < processor_queue_.size();
+}
+
+bool TestPlan::runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  logger_->log_info("Rerunning current processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+  std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+  process_sessions_.push_back(current_session);
+  current_flowfile_ = nullptr;
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
   if (verify != nullptr) {
@@ -178,7 +212,6 @@ bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::P
     processor->onTrigger(context, current_session);
   }
   current_session->commit();
-  current_flowfile_ = current_session->get();
   return location + 1 < processor_queue_.size();
 }
 
@@ -187,6 +220,9 @@ std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> TestPlan::getProven
 }
 
 std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
+  if (current_flowfile_ == nullptr) {
+    current_flowfile_ = process_sessions_.at(location)->get();
+  }
   return current_flowfile_;
 }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 33bf40e..21a2f43 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -187,12 +187,17 @@ class TestPlan {
   std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
                                                 bool linkToPrevious = false);
 
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
+                                                bool linkToPrevious = false);
+
   bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
 
   void reset();
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
 
+  bool runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
   std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> getProvenanceRecords();
 
   std::shared_ptr<core::FlowFile> getCurrentFlowFile();
@@ -232,7 +237,6 @@ class TestPlan {
 
   int location;
 
-  std::shared_ptr<core::ProcessSession> current_session_;
   std::shared_ptr<core::FlowFile> current_flowfile_;
 
   std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 8b3d3ae..85c783b 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -59,6 +59,28 @@ TEST_CASE("TestFileUtils::get_parent_path", "[TestGetParentPath]") {
 #endif
 }
 
+TEST_CASE("TestFileUtils::get_child_path", "[TestGetChildPath]") {
+#ifdef WIN32
+  REQUIRE("bar" == FileUtils::get_child_path("foo\\bar"));
+  REQUIRE("bar\\" == FileUtils::get_child_path("foo\\bar\\"));
+  REQUIRE("bar" == FileUtils::get_child_path("C:\\foo\\bar"));
+  REQUIRE("bar\\" == FileUtils::get_child_path("C:\\foo\\bar\\"));
+  REQUIRE("foo" == FileUtils::get_child_path("C:\\foo"));
+  REQUIRE("foo\\" == FileUtils::get_child_path("C:\\foo\\"));
+  REQUIRE("" == FileUtils::get_child_path("C:\\"));
+  REQUIRE("" == FileUtils::get_child_path("C:\\\\"));
+#else
+  REQUIRE("bar" == FileUtils::get_child_path("foo/bar"));
+  REQUIRE("bar/" == FileUtils::get_child_path("foo/bar/"));
+  REQUIRE("bar" == FileUtils::get_child_path("/foo/bar"));
+  REQUIRE("bar/" == FileUtils::get_child_path("/foo/bar/"));
+  REQUIRE("foo" == FileUtils::get_child_path("/foo"));
+  REQUIRE("foo/" == FileUtils::get_child_path("/foo/"));
+  REQUIRE("" == FileUtils::get_child_path("/"));
+  REQUIRE("" == FileUtils::get_child_path("//"));
+#endif
+}
+
 TEST_CASE("TestFileUtils::get_executable_path", "[TestGetExecutablePath]") {
   std::string executable_path = FileUtils::get_executable_path();
   std::cerr << "Executable path: " << executable_path << std::endl;


[nifi-minifi-cpp] 02/03: MINIFICPP-917 Update TensorFlow extension to the latest minifi-cpp CMake best practices

Posted by ph...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 848408dee4cb32ced5402a5a57ec319c7f29c9f1
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Wed Jun 12 15:02:53 2019 -0400

    MINIFICPP-917 Update TensorFlow extension to the latest minifi-cpp CMake best practices
    
    This closes #591.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 .gitignore                                     |  2 ++
 cmake/FindTensorFlow.cmake                     |  1 +
 extensions/tensorflow/CMakeLists.txt           | 27 ++------------------------
 libminifi/test/tensorflow-tests/CMakeLists.txt |  3 ++-
 4 files changed, 7 insertions(+), 26 deletions(-)

diff --git a/.gitignore b/.gitignore
index 32230e7..b135a55 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,3 +73,5 @@ docker/fedora/minificppsource
 /cmake-build-*
 .vs/VSWorkspaceState.json
 .vs/slnx.sqlite
+/.ccls-cache
+/.vscode
\ No newline at end of file
diff --git a/cmake/FindTensorFlow.cmake b/cmake/FindTensorFlow.cmake
index be1023f..732e3ae 100644
--- a/cmake/FindTensorFlow.cmake
+++ b/cmake/FindTensorFlow.cmake
@@ -29,6 +29,7 @@ find_path(TENSORFLOW_INCLUDE_DIR
           third_party
           HINTS
           ${TENSORFLOW_PATH}
+          /usr/include/tensorflow
           /usr/local/include/google/tensorflow
           /usr/local/include/tensorflow
           /usr/include/google/tensorflow)
diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt
index 970f6cc..a641a18 100644
--- a/extensions/tensorflow/CMakeLists.txt
+++ b/extensions/tensorflow/CMakeLists.txt
@@ -17,6 +17,8 @@
 # under the License.
 #
 
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
 set(CMAKE_CXX_STANDARD 14)
 set(CMAKE_CXX_STANDARD_REQUIRED ON)
 
@@ -24,8 +26,6 @@ find_package(TensorFlow REQUIRED)
 
 message("-- Found TensorFlow: ${TENSORFLOW_INCLUDE_DIRS}")
 
-include_directories(../../libminifi/include  ../../libminifi/include/core  ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/) 
-
 include_directories(${TENSORFLOW_INCLUDE_DIRS})
 
 file(GLOB SOURCES  "*.cpp")
@@ -39,30 +39,7 @@ if(CMAKE_THREAD_LIBS_INIT)
   target_link_libraries(minifi-tensorflow-extensions "${CMAKE_THREAD_LIBS_INIT}")
 endif()
 
-find_package(UUID REQUIRED)
-target_link_libraries(minifi-tensorflow-extensions ${LIBMINIFI} ${UUID_LIBRARIES})
-find_package(OpenSSL REQUIRED)
-include_directories(${OPENSSL_INCLUDE_DIR})
-target_link_libraries(minifi-tensorflow-extensions ${CMAKE_DL_LIBS})
 target_link_libraries(minifi-tensorflow-extensions ${TENSORFLOW_LIBRARIES})
-find_package(ZLIB REQUIRED)
-include_directories(${ZLIB_INCLUDE_DIRS})
-target_link_libraries (minifi-tensorflow-extensions ${ZLIB_LIBRARIES})
-
-if (WIN32)
-    set_target_properties(minifi-tensorflow-extensions PROPERTIES
-        LINK_FLAGS "/WHOLEARCHIVE"
-    )
-elseif (APPLE)
-    set_target_properties(minifi-tensorflow-extensions PROPERTIES
-        LINK_FLAGS "-Wl,-all_load"
-    )
-else ()
-    set_target_properties(minifi-tensorflow-extensions PROPERTIES
-        LINK_FLAGS "-Wl,--whole-archive"
-    )
-endif ()
-
 
 SET (TENSORFLOW-EXTENSIONS minifi-tensorflow-extensions PARENT_SCOPE)
 
diff --git a/libminifi/test/tensorflow-tests/CMakeLists.txt b/libminifi/test/tensorflow-tests/CMakeLists.txt
index e0b3ffa..906b325 100644
--- a/libminifi/test/tensorflow-tests/CMakeLists.txt
+++ b/libminifi/test/tensorflow-tests/CMakeLists.txt
@@ -26,12 +26,13 @@ FOREACH(testfile ${TENSORFLOW_INTEGRATION_TESTS})
 	get_filename_component(testfilename "${testfile}" NAME_WE)
 	add_executable("${testfilename}" "${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp")
 	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/tensorflow")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
 	target_include_directories(${testfilename} PRIVATE BEFORE ${TENSORFLOW_INCLUDE_DIRS})
 	createTests("${testfilename}")
 	if (APPLE)
 		target_link_libraries (${testfilename} -Wl,-all_load minifi-tensorflow-extensions)
 	else ()
-	    target_link_libraries (${testfilename} -Wl,--whole-archive minifi-tensorflow-extensions -Wl,--no-whole-archive)
+	    target_link_libraries (${testfilename} -Wl,--whole-archive minifi-tensorflow-extensions minifi-standard-processors -Wl,--no-whole-archive)
 	endif ()
 	MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1")
 	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})


[nifi-minifi-cpp] 03/03: MINIFICPP-918 Update TensorFlow processors to use property builder

Posted by ph...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit dedeeffc3f78fe794b52d2870acf7ba69f3c2a30
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Wed Jun 12 16:13:09 2019 -0400

    MINIFICPP-918 Update TensorFlow processors to use property builder
    
    This closes #592.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 .gitignore                                       |   1 +
 extensions/tensorflow/TFApplyGraph.cpp           |  24 ++++--
 extensions/tensorflow/TFConvertImageToTensor.cpp | 100 +++++++++++++++--------
 3 files changed, 85 insertions(+), 40 deletions(-)

diff --git a/.gitignore b/.gitignore
index b135a55..d014fd5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,6 +56,7 @@ libminifi/test/**/*.a
 libminifi/include/agent/agent_version.h
 docs/generated
 thirdparty/apache-rat/apache-rat*
+/compile_commands.json
 
 # Ignore source files that have been placed in the docker directory during build
 docker/minificppsource
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index ce16d97..8d0092d 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -16,8 +16,9 @@
  */
 
 #include "TFApplyGraph.h"
-
-#include "tensorflow/cc/ops/standard_ops.h"
+#include <core/ProcessContext.h>
+#include <core/ProcessSession.h>
+#include <tensorflow/cc/ops/standard_ops.h>
 
 namespace org {
 namespace apache {
@@ -25,12 +26,19 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TFApplyGraph::InputNode(  // NOLINT
-    "Input Node",
-    "The node of the TensorFlow graph to feed tensor inputs to", "");
-core::Property TFApplyGraph::OutputNode(  // NOLINT
-    "Output Node",
-    "The node of the TensorFlow graph to read tensor outputs from", "");
+core::Property TFApplyGraph::InputNode(
+    core::PropertyBuilder::createProperty("Input Node")
+        ->withDescription(
+            "The node of the TensorFlow graph to feed tensor inputs to")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFApplyGraph::OutputNode(
+    core::PropertyBuilder::createProperty("Output Node")
+        ->withDescription(
+            "The node of the TensorFlow graph to read tensor outputs from")
+        ->withDefaultValue("")
+        ->build());
 
 core::Relationship TFApplyGraph::Success(  // NOLINT
     "success",
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index b6a1183..5b2ace0 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -16,8 +16,9 @@
  */
 
 #include "TFConvertImageToTensor.h"
-
-#include "tensorflow/cc/ops/standard_ops.h"
+#include <core/ProcessContext.h>
+#include <core/ProcessSession.h>
+#include <tensorflow/cc/ops/standard_ops.h>
 
 namespace org {
 namespace apache {
@@ -25,36 +26,71 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TFConvertImageToTensor::ImageFormat(  // NOLINT
-    "Input Format",
-    "The format of the input image (PNG or RAW). RAW is RGB24.", "");
-core::Property TFConvertImageToTensor::InputWidth(  // NOLINT
-    "Input Width",
-    "The width, in pixels, of the input image.", "");
-core::Property TFConvertImageToTensor::InputHeight(  // NOLINT
-    "Input Height",
-    "The height, in pixels, of the input image.", "");
-core::Property TFConvertImageToTensor::OutputWidth(  // NOLINT
-    "Output Width",
-    "The width, in pixels, of the output image.", "");
-core::Property TFConvertImageToTensor::OutputHeight(  // NOLINT
-    "Output Height",
-    "The height, in pixels, of the output image.", "");
-core::Property TFConvertImageToTensor::NumChannels(  // NOLINT
-    "Channels",
-    "The number of channels (e.g. 3 for RGB, 4 for RGBA) in the input image", "3");
-core::Property TFConvertImageToTensor::CropOffsetX(  // NOLINT
-    "Crop Offset X",
-    "The X (horizontal) offset, in pixels, to crop the input image (relative to top-left corner).", "");
-core::Property TFConvertImageToTensor::CropOffsetY(  // NOLINT
-    "Crop Offset Y",
-    "The Y (vertical) offset, in pixels, to crop the input image (relative to top-left corner).", "");
-core::Property TFConvertImageToTensor::CropSizeX(  // NOLINT
-    "Crop Size X",
-    "The X (horizontal) size, in pixels, to crop the input image.", "");
-core::Property TFConvertImageToTensor::CropSizeY(  // NOLINT
-    "Crop Size Y",
-    "The Y (vertical) size, in pixels, to crop the input image.", "");
+core::Property TFConvertImageToTensor::ImageFormat(
+    core::PropertyBuilder::createProperty("Input Format")
+        ->withDescription(
+            "The format of the input image (PNG or RAW). RAW is RGB24.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::InputWidth(
+    core::PropertyBuilder::createProperty("Input Width")
+        ->withDescription("The width, in pixels, of the input image.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::InputHeight(
+    core::PropertyBuilder::createProperty("Input Height")
+        ->withDescription("The height, in pixels, of the input image.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::OutputWidth(
+    core::PropertyBuilder::createProperty("Output Width")
+        ->withDescription("The width, in pixels, of the output image.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::OutputHeight(
+    core::PropertyBuilder::createProperty("Output Height")
+        ->withDescription("The height, in pixels, of the output image.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::NumChannels(
+    core::PropertyBuilder::createProperty("Channels")
+        ->withDescription("The number of channels (e.g. 3 for RGB, 4 for RGBA) "
+                          "in the input image")
+        ->withDefaultValue("3")
+        ->build());
+
+core::Property TFConvertImageToTensor::CropOffsetX(
+    core::PropertyBuilder::createProperty("Crop Offset X")
+        ->withDescription("The X (horizontal) offset, in pixels, to crop the "
+                          "input image (relative to top-left corner).")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::CropOffsetY(
+    core::PropertyBuilder::createProperty("Crop Offset Y")
+        ->withDescription("The Y (vertical) offset, in pixels, to crop the "
+                          "input image (relative to top-left corner).")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::CropSizeX(
+    core::PropertyBuilder::createProperty("Crop Size X")
+        ->withDescription(
+            "The X (horizontal) size, in pixels, to crop the input image.")
+        ->withDefaultValue("")
+        ->build());
+
+core::Property TFConvertImageToTensor::CropSizeY(
+    core::PropertyBuilder::createProperty("Crop Size Y")
+        ->withDescription(
+            "The Y (vertical) size, in pixels, to crop the input image.")
+        ->withDefaultValue("")
+        ->build());
 
 core::Relationship TFConvertImageToTensor::Success(  // NOLINT
     "success",