You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/03/29 10:56:24 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1786 Improve FetchFile processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 901ec1c  MINIFICPP-1786 Improve FetchFile processor
901ec1c is described below

commit 901ec1cf0cc34c1010bc7a395b22922b82691e14
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 29 11:45:32 2022 +0200

    MINIFICPP-1786 Improve FetchFile processor
    
    Closes #1288
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../standard-processors/processors/FetchFile.cpp   |  47 ++--
 .../standard-processors/processors/FetchFile.h     |   8 +-
 .../tests/unit/FetchFileTests.cpp                  | 285 ++++++++-------------
 3 files changed, 137 insertions(+), 203 deletions(-)

diff --git a/extensions/standard-processors/processors/FetchFile.cpp b/extensions/standard-processors/processors/FetchFile.cpp
index 924b540..96f682a 100644
--- a/extensions/standard-processors/processors/FetchFile.cpp
+++ b/extensions/standard-processors/processors/FetchFile.cpp
@@ -115,7 +115,7 @@ void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
 }
 
-std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+std::filesystem::path FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
   std::string file_to_fetch_path;
   context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
   if (!file_to_fetch_path.empty()) {
@@ -125,26 +125,26 @@ std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::
   flow_file->getAttribute("absolute.path", file_to_fetch_path);
   std::string filename;
   flow_file->getAttribute("filename", filename);
-  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
-  return file_to_fetch_path;
+  return std::filesystem::path(file_to_fetch_path) / filename;
 }
 
-void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+template<typename... Args>
+void FetchFile::logWithLevel(LogLevelOption log_level, Args&&... args) const {
   switch (log_level.value()) {
     case LogLevelOption::LOGGING_TRACE:
-      logger_->log_trace(message.c_str());
+      logger_->log_trace(std::forward<Args>(args)...);
       break;
     case LogLevelOption::LOGGING_DEBUG:
-      logger_->log_debug(message.c_str());
+      logger_->log_debug(std::forward<Args>(args)...);
       break;
     case LogLevelOption::LOGGING_INFO:
-      logger_->log_info(message.c_str());
+      logger_->log_info(std::forward<Args>(args)...);
       break;
     case LogLevelOption::LOGGING_WARN:
-      logger_->log_warn(message.c_str());
+      logger_->log_warn(std::forward<Args>(args)...);
       break;
     case LogLevelOption::LOGGING_ERROR:
-      logger_->log_error(message.c_str());
+      logger_->log_error(std::forward<Args>(args)...);
       break;
     case LogLevelOption::LOGGING_OFF:
     default:
@@ -160,7 +160,7 @@ bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
   return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
 }
 
-bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_name) const {
+bool FetchFile::moveWouldFailWithDestinationConflict(const std::string& file_name) const {
   if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
     return false;
   }
@@ -171,14 +171,14 @@ bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_nam
 void FetchFile::executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
   if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
     auto moved_path = getMoveAbsolutePath(file_name);
-    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path);
+    logger_->log_debug("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path);
     std::filesystem::rename(file_to_fetch_path, moved_path);
   } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
     auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
-    logger_->log_info("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
+    logger_->log_debug("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
     std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
   } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
-    logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path);
+    logger_->log_debug("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path);
     std::filesystem::remove(file_to_fetch_path);
   }
 }
@@ -189,7 +189,7 @@ void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, con
       std::filesystem::create_directories(move_destination_directory_);
     }
     auto moved_path = getMoveAbsolutePath(file_name);
-    logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path, moved_path);
+    logger_->log_debug("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path, moved_path);
     std::filesystem::rename(file_to_fetch_path, moved_path);
     return;
   }
@@ -202,7 +202,7 @@ void FetchFile::executeCompletionStrategy(const std::string& file_to_fetch_path,
     if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
       processMoveCompletion(file_to_fetch_path, file_name);
     } else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
-      logger_->log_info("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path);
+      logger_->log_debug("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path);
       std::filesystem::remove(file_to_fetch_path);
     }
   } catch(const std::filesystem::filesystem_error& ex) {
@@ -220,40 +220,41 @@ void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   }
 
   const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+  auto file_fetch_path_str = file_to_fetch_path.string();
   if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
-    logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '" + file_to_fetch_path + "'!");
+    logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '%s'!", file_fetch_path_str);
     session->transfer(flow_file, NotFound);
     return;
   }
 
   std::string path;
   std::string file_name;
-  utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+  utils::file::getFileNameAndPath(file_fetch_path_str, path, file_name);
 
   context->getProperty(MoveDestinationDirectory, move_destination_directory_, flow_file);
-  if (moveWouldFailWithDestinationconflict(file_name)) {
+  if (moveWouldFailWithDestinationConflict(file_name)) {
     logger_->log_error("Move destination (%s) conflicts with an already existing file!", move_destination_directory_);
     session->transfer(flow_file, Failure);
     return;
   }
 
   try {
-    utils::FileReaderCallback callback(file_to_fetch_path);
+    utils::FileReaderCallback callback(file_fetch_path_str);
     session->write(flow_file, std::move(callback));
-    logger_->log_debug("Fetching file '%s' successful!", file_to_fetch_path);
+    logger_->log_debug("Fetching file '%s' successful!", file_fetch_path_str);
     session->transfer(flow_file, Success);
   } catch (const utils::FileReaderCallbackIOError& io_error) {
     if (io_error.error_code == EACCES) {
-      logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '" + file_to_fetch_path + "' to be fetched!");
+      logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '%s' to be fetched!", file_fetch_path_str);
       session->transfer(flow_file, PermissionDenied);
     } else {
-      logger_->log_error("Fetching file '%s' failed! %s", file_to_fetch_path, io_error.what());
+      logger_->log_error("Fetching file '%s' failed! %s", file_fetch_path_str, io_error.what());
       session->transfer(flow_file, Failure);
     }
     return;
   }
 
-  executeCompletionStrategy(file_to_fetch_path, file_name);
+  executeCompletionStrategy(file_fetch_path_str, file_name);
 }
 
 REGISTER_RESOURCE(FetchFile, "Reads the contents of a file from disk and streams it into the contents of an incoming FlowFile. "
diff --git a/extensions/standard-processors/processors/FetchFile.h b/extensions/standard-processors/processors/FetchFile.h
index 3b52805..39c7e13 100644
--- a/extensions/standard-processors/processors/FetchFile.h
+++ b/extensions/standard-processors/processors/FetchFile.h
@@ -76,11 +76,13 @@ class FetchFile : public core::Processor {
   }
 
  private:
-  std::string getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
-  void logWithLevel(LogLevelOption log_level, const std::string& message) const;
+  template<typename... Args>
+  void logWithLevel(LogLevelOption log_level, Args&&... args) const;
+
+  std::filesystem::path getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
   std::string getMoveAbsolutePath(const std::string& file_name) const;
   bool moveDestinationConflicts(const std::string& file_name) const;
-  bool moveWouldFailWithDestinationconflict(const std::string& file_name) const;
+  bool moveWouldFailWithDestinationConflict(const std::string& file_name) const;
   void executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
   void processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name);
   void executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
index dbc1762..853ecf3 100644
--- a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -18,16 +18,17 @@
 #include <memory>
 #include <string>
 #include <unordered_set>
+#include <unordered_map>
+#include <filesystem>
 
 #include "TestBase.h"
 #include "Catch.h"
 #include "core/Property.h"
 #include "core/Processor.h"
-#include "processors/GenerateFlowFile.h"
 #include "processors/FetchFile.h"
-#include "processors/PutFile.h"
 #include "utils/TestUtils.h"
 #include "utils/IntegrationTestUtils.h"
+#include "SingleInputTestController.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -37,89 +38,38 @@ class FetchFileTestFixture {
  public:
   FetchFileTestFixture();
   ~FetchFileTestFixture();
-  std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
-  std::unordered_multiset<std::string> getFailedFlowFileContents() const;
-  std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
-#ifndef WIN32
-  std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents() const;
-#endif
 
  protected:
   std::unordered_multiset<std::string> getDirContents(const std::string& dir_path) const;
 
-  TestController test_controller_;
-  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<minifi::processors::FetchFile> fetch_file_processor_;
+  std::shared_ptr<minifi::test::SingleInputTestController> test_controller_;
   const std::string input_dir_;
-  const std::string success_output_dir_;
-  const std::string failure_output_dir_;
-  const std::string not_found_output_dir_;
-  const std::string permission_denied_output_dir_;
   const std::string permission_denied_file_name_;
   const std::string input_file_name_;
   const std::string file_content_;
-  std::shared_ptr<core::Processor> fetch_file_processor_;
-  std::shared_ptr<core::Processor> update_attribute_processor_;
+  std::unordered_map<std::string, std::string> attributes_;
 };
 
 FetchFileTestFixture::FetchFileTestFixture()
-  : plan_(test_controller_.createPlan()),
-    input_dir_(test_controller_.createTempDirectory()),
-    success_output_dir_(test_controller_.createTempDirectory()),
-    failure_output_dir_(test_controller_.createTempDirectory()),
-    not_found_output_dir_(test_controller_.createTempDirectory()),
-    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+  : fetch_file_processor_(std::make_shared<minifi::processors::FetchFile>("FetchFile")),
+    test_controller_(std::make_shared<minifi::test::SingleInputTestController>(fetch_file_processor_)),
+    input_dir_(test_controller_->createTempDirectory()),
     permission_denied_file_name_("permission_denied.txt"),
     input_file_name_("test.txt"),
     file_content_("The quick brown fox jumps over the lazy dog\n")  {
   LogTestController::getInstance().setTrace<TestPlan>();
   LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
-  LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
-
-  REQUIRE(!input_dir_.empty());
-  REQUIRE(!success_output_dir_.empty());
-  REQUIRE(!failure_output_dir_.empty());
-  REQUIRE(!not_found_output_dir_.empty());
-  REQUIRE(!permission_denied_output_dir_.empty());
-
-  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile");
-  plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B");
-  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true);
-  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true);
-  plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true);
-
-  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true);
-
-  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
-  plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile);
-  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
-  plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
-
-  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
-  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile);
-  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
-  plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
-
-  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false);
-  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile);
-  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}});
-  plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_);
-
-  auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false);
-  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile);
-  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}});
-  plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_);
+
+  attributes_ = {{"absolute.path", input_dir_}, {"filename", input_file_name_}};
 
   utils::putFileToDir(input_dir_, input_file_name_, file_content_);
   utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
-#ifndef WIN32
-  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
-#endif
+  std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0));
 }
 
 FetchFileTestFixture::~FetchFileTestFixture() {
-#ifndef WIN32
-  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
-#endif
+  std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0644));
 }
 
 std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
@@ -131,68 +81,50 @@ std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const
     return true;
   };
 
-  utils::file::FileUtils::list_dir(dir_path, lambda, plan_->getLogger(), false);
+  utils::file::FileUtils::list_dir(dir_path, lambda, test_controller_->plan->getLogger(), false);
   return file_contents;
 }
 
-std::unordered_multiset<std::string> FetchFileTestFixture::getSuccessfulFlowFileContents() const {
-  return getDirContents(success_output_dir_);
-}
-
-std::unordered_multiset<std::string> FetchFileTestFixture::getFailedFlowFileContents() const {
-  return getDirContents(failure_output_dir_);
-}
-
-std::unordered_multiset<std::string> FetchFileTestFixture::getNotFoundFlowFileContents() const {
-  return getDirContents(not_found_output_dir_);
-}
-
-#ifndef WIN32
-std::unordered_multiset<std::string> FetchFileTestFixture::getPermissionDeniedFlowFileContents() const {
-  return getDirContents(permission_denied_output_dir_);
-}
-#endif
-
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
-  plan_->setProperty(update_attribute_processor_, "filename", "non_existent.file", true);
-  test_controller_.runSession(plan_);
-  auto file_contents = getNotFoundFlowFileContents();
-  std::unordered_multiset<std::string> expected{""};
-  REQUIRE(file_contents == expected);
+  attributes_["filename"] = "non_existent.file";
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not found"));
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), "/tmp/non_existent.file");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound.getName(), "INFO");
-  test_controller_.runSession(plan_);
-  auto file_contents = getNotFoundFlowFileContents();
-  std::unordered_multiset<std::string> expected{""};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not found"));
 }
 
 #ifndef WIN32
 TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testFetchFile]") {
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
     input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied.getName(), "WARN");
-  test_controller_.runSession(plan_);
-  auto file_contents = getPermissionDeniedFlowFileContents();
-  std::unordered_multiset<std::string> expected{""};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission denied for file"));
 }
 #endif
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 }
 
@@ -200,29 +132,29 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path",
   REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "sub"));
   utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
   auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), file_path);
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(utils::file::FileUtils::exists(file_path));
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  REQUIRE_THROWS_AS(test_controller_.runSession(plan_), minifi::Exception);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::putFileToDir(move_dir, input_file_name_, "old content");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
-  test_controller_.runSession(plan_);
-  auto file_contents = getFailedFlowFileContents();
-  std::unordered_multiset<std::string> expected{""};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Failure);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
 
   std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
   REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
@@ -230,26 +162,26 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[test
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored when completion strategy is not move file", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::putFileToDir(move_dir, input_file_name_, "old content");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with replace file", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::putFileToDir(move_dir, input_file_name_, "old content");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Replace File");
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 
   std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -257,33 +189,32 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with renaming file to a new random filename", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::putFileToDir(move_dir, input_file_name_, "old content");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Rename");
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 
-
   auto move_dir_contents = getDirContents(move_dir);
-  expected = {"old content", file_content_};
+  std::unordered_multiset<std::string> expected = {"old content", file_content_};
   REQUIRE(move_dir_contents == expected);
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with deleting the new file and keeping the old one", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::putFileToDir(move_dir, input_file_name_, "old content");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Keep Existing");
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 
   std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -291,13 +222,13 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory after flow completion", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  auto move_dir = test_controller_->createTempDirectory();
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 
   std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -305,14 +236,14 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file is moved to a non-existent directory which is created by the flow", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 
   std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -321,14 +252,14 @@ TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file i
 
 #ifndef WIN32
 TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to filesystem error still succeeds flow", "[testFetchFile]") {
-  auto move_dir = test_controller_.createTempDirectory();
+  auto move_dir = test_controller_->createTempDirectory();
   utils::file::FileUtils::set_permissions(move_dir, 0);
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
   using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   REQUIRE(verifyLogLinePresenceInPollTime(1s, "completion strategy failed"));
@@ -337,11 +268,11 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to
 #endif
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
-  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Delete File");
-  test_controller_.runSession(plan_);
-  auto file_contents = getSuccessfulFlowFileContents();
-  std::unordered_multiset<std::string> expected{file_content_};
-  REQUIRE(file_contents == expected);
+  fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
+  const auto result = test_controller_->trigger("", attributes_);
+  auto file_contents = result.at(minifi::processors::FetchFile::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
   REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
 }