You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/03/29 10:56:24 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1786 Improve FetchFile processor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 901ec1c MINIFICPP-1786 Improve FetchFile processor
901ec1c is described below
commit 901ec1cf0cc34c1010bc7a395b22922b82691e14
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Mar 29 11:45:32 2022 +0200
MINIFICPP-1786 Improve FetchFile processor
Closes #1288
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../standard-processors/processors/FetchFile.cpp | 47 ++--
.../standard-processors/processors/FetchFile.h | 8 +-
.../tests/unit/FetchFileTests.cpp | 285 ++++++++-------------
3 files changed, 137 insertions(+), 203 deletions(-)
diff --git a/extensions/standard-processors/processors/FetchFile.cpp b/extensions/standard-processors/processors/FetchFile.cpp
index 924b540..96f682a 100644
--- a/extensions/standard-processors/processors/FetchFile.cpp
+++ b/extensions/standard-processors/processors/FetchFile.cpp
@@ -115,7 +115,7 @@ void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
}
-std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+std::filesystem::path FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
std::string file_to_fetch_path;
context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
if (!file_to_fetch_path.empty()) {
@@ -125,26 +125,26 @@ std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::
flow_file->getAttribute("absolute.path", file_to_fetch_path);
std::string filename;
flow_file->getAttribute("filename", filename);
- file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
- return file_to_fetch_path;
+ return std::filesystem::path(file_to_fetch_path) / filename;
}
-void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+template<typename... Args>
+void FetchFile::logWithLevel(LogLevelOption log_level, Args&&... args) const {
switch (log_level.value()) {
case LogLevelOption::LOGGING_TRACE:
- logger_->log_trace(message.c_str());
+ logger_->log_trace(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_DEBUG:
- logger_->log_debug(message.c_str());
+ logger_->log_debug(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_INFO:
- logger_->log_info(message.c_str());
+ logger_->log_info(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_WARN:
- logger_->log_warn(message.c_str());
+ logger_->log_warn(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_ERROR:
- logger_->log_error(message.c_str());
+ logger_->log_error(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_OFF:
default:
@@ -160,7 +160,7 @@ bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
}
-bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_name) const {
+bool FetchFile::moveWouldFailWithDestinationConflict(const std::string& file_name) const {
if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
return false;
}
@@ -171,14 +171,14 @@ bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_nam
void FetchFile::executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
auto moved_path = getMoveAbsolutePath(file_name);
- logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path);
+ logger_->log_debug("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path);
std::filesystem::rename(file_to_fetch_path, moved_path);
} else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
- logger_->log_info("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
+ logger_->log_debug("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
} else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
- logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path);
+ logger_->log_debug("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path);
std::filesystem::remove(file_to_fetch_path);
}
}
@@ -189,7 +189,7 @@ void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, con
std::filesystem::create_directories(move_destination_directory_);
}
auto moved_path = getMoveAbsolutePath(file_name);
- logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path, moved_path);
+ logger_->log_debug("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path, moved_path);
std::filesystem::rename(file_to_fetch_path, moved_path);
return;
}
@@ -202,7 +202,7 @@ void FetchFile::executeCompletionStrategy(const std::string& file_to_fetch_path,
if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
processMoveCompletion(file_to_fetch_path, file_name);
} else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
- logger_->log_info("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path);
+ logger_->log_debug("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path);
std::filesystem::remove(file_to_fetch_path);
}
} catch(const std::filesystem::filesystem_error& ex) {
@@ -220,40 +220,41 @@ void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
}
const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+ auto file_fetch_path_str = file_to_fetch_path.string();
if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
- logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '" + file_to_fetch_path + "'!");
+ logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '%s'!", file_fetch_path_str);
session->transfer(flow_file, NotFound);
return;
}
std::string path;
std::string file_name;
- utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+ utils::file::getFileNameAndPath(file_fetch_path_str, path, file_name);
context->getProperty(MoveDestinationDirectory, move_destination_directory_, flow_file);
- if (moveWouldFailWithDestinationconflict(file_name)) {
+ if (moveWouldFailWithDestinationConflict(file_name)) {
logger_->log_error("Move destination (%s) conflicts with an already existing file!", move_destination_directory_);
session->transfer(flow_file, Failure);
return;
}
try {
- utils::FileReaderCallback callback(file_to_fetch_path);
+ utils::FileReaderCallback callback(file_fetch_path_str);
session->write(flow_file, std::move(callback));
- logger_->log_debug("Fetching file '%s' successful!", file_to_fetch_path);
+ logger_->log_debug("Fetching file '%s' successful!", file_fetch_path_str);
session->transfer(flow_file, Success);
} catch (const utils::FileReaderCallbackIOError& io_error) {
if (io_error.error_code == EACCES) {
- logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '" + file_to_fetch_path + "' to be fetched!");
+ logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '%s' to be fetched!", file_fetch_path_str);
session->transfer(flow_file, PermissionDenied);
} else {
- logger_->log_error("Fetching file '%s' failed! %s", file_to_fetch_path, io_error.what());
+ logger_->log_error("Fetching file '%s' failed! %s", file_fetch_path_str, io_error.what());
session->transfer(flow_file, Failure);
}
return;
}
- executeCompletionStrategy(file_to_fetch_path, file_name);
+ executeCompletionStrategy(file_fetch_path_str, file_name);
}
REGISTER_RESOURCE(FetchFile, "Reads the contents of a file from disk and streams it into the contents of an incoming FlowFile. "
diff --git a/extensions/standard-processors/processors/FetchFile.h b/extensions/standard-processors/processors/FetchFile.h
index 3b52805..39c7e13 100644
--- a/extensions/standard-processors/processors/FetchFile.h
+++ b/extensions/standard-processors/processors/FetchFile.h
@@ -76,11 +76,13 @@ class FetchFile : public core::Processor {
}
private:
- std::string getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
- void logWithLevel(LogLevelOption log_level, const std::string& message) const;
+ template<typename... Args>
+ void logWithLevel(LogLevelOption log_level, Args&&... args) const;
+
+ std::filesystem::path getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
std::string getMoveAbsolutePath(const std::string& file_name) const;
bool moveDestinationConflicts(const std::string& file_name) const;
- bool moveWouldFailWithDestinationconflict(const std::string& file_name) const;
+ bool moveWouldFailWithDestinationConflict(const std::string& file_name) const;
void executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
void processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name);
void executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
index dbc1762..853ecf3 100644
--- a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -18,16 +18,17 @@
#include <memory>
#include <string>
#include <unordered_set>
+#include <unordered_map>
+#include <filesystem>
#include "TestBase.h"
#include "Catch.h"
#include "core/Property.h"
#include "core/Processor.h"
-#include "processors/GenerateFlowFile.h"
#include "processors/FetchFile.h"
-#include "processors/PutFile.h"
#include "utils/TestUtils.h"
#include "utils/IntegrationTestUtils.h"
+#include "SingleInputTestController.h"
using namespace std::literals::chrono_literals;
@@ -37,89 +38,38 @@ class FetchFileTestFixture {
public:
FetchFileTestFixture();
~FetchFileTestFixture();
- std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
- std::unordered_multiset<std::string> getFailedFlowFileContents() const;
- std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
-#ifndef WIN32
- std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents() const;
-#endif
protected:
std::unordered_multiset<std::string> getDirContents(const std::string& dir_path) const;
- TestController test_controller_;
- std::shared_ptr<TestPlan> plan_;
+ std::shared_ptr<minifi::processors::FetchFile> fetch_file_processor_;
+ std::shared_ptr<minifi::test::SingleInputTestController> test_controller_;
const std::string input_dir_;
- const std::string success_output_dir_;
- const std::string failure_output_dir_;
- const std::string not_found_output_dir_;
- const std::string permission_denied_output_dir_;
const std::string permission_denied_file_name_;
const std::string input_file_name_;
const std::string file_content_;
- std::shared_ptr<core::Processor> fetch_file_processor_;
- std::shared_ptr<core::Processor> update_attribute_processor_;
+ std::unordered_map<std::string, std::string> attributes_;
};
FetchFileTestFixture::FetchFileTestFixture()
- : plan_(test_controller_.createPlan()),
- input_dir_(test_controller_.createTempDirectory()),
- success_output_dir_(test_controller_.createTempDirectory()),
- failure_output_dir_(test_controller_.createTempDirectory()),
- not_found_output_dir_(test_controller_.createTempDirectory()),
- permission_denied_output_dir_(test_controller_.createTempDirectory()),
+ : fetch_file_processor_(std::make_shared<minifi::processors::FetchFile>("FetchFile")),
+ test_controller_(std::make_shared<minifi::test::SingleInputTestController>(fetch_file_processor_)),
+ input_dir_(test_controller_->createTempDirectory()),
permission_denied_file_name_("permission_denied.txt"),
input_file_name_("test.txt"),
file_content_("The quick brown fox jumps over the lazy dog\n") {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
- LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
-
- REQUIRE(!input_dir_.empty());
- REQUIRE(!success_output_dir_.empty());
- REQUIRE(!failure_output_dir_.empty());
- REQUIRE(!not_found_output_dir_.empty());
- REQUIRE(!permission_denied_output_dir_.empty());
-
- auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile");
- plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B");
- update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true);
- plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true);
- plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true);
-
- fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true);
-
- auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
- plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile);
- success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
- plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
-
- auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
- plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile);
- failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
- plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
-
- auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false);
- plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile);
- not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}});
- plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_);
-
- auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false);
- plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile);
- not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}});
- plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_);
+
+ attributes_ = {{"absolute.path", input_dir_}, {"filename", input_file_name_}};
utils::putFileToDir(input_dir_, input_file_name_, file_content_);
utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
-#ifndef WIN32
- utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
-#endif
+ std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0));
}
FetchFileTestFixture::~FetchFileTestFixture() {
-#ifndef WIN32
- utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
-#endif
+ std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0644));
}
std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
@@ -131,68 +81,50 @@ std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const
return true;
};
- utils::file::FileUtils::list_dir(dir_path, lambda, plan_->getLogger(), false);
+ utils::file::FileUtils::list_dir(dir_path, lambda, test_controller_->plan->getLogger(), false);
return file_contents;
}
-std::unordered_multiset<std::string> FetchFileTestFixture::getSuccessfulFlowFileContents() const {
- return getDirContents(success_output_dir_);
-}
-
-std::unordered_multiset<std::string> FetchFileTestFixture::getFailedFlowFileContents() const {
- return getDirContents(failure_output_dir_);
-}
-
-std::unordered_multiset<std::string> FetchFileTestFixture::getNotFoundFlowFileContents() const {
- return getDirContents(not_found_output_dir_);
-}
-
-#ifndef WIN32
-std::unordered_multiset<std::string> FetchFileTestFixture::getPermissionDeniedFlowFileContents() const {
- return getDirContents(permission_denied_output_dir_);
-}
-#endif
-
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
- plan_->setProperty(update_attribute_processor_, "filename", "non_existent.file", true);
- test_controller_.runSession(plan_);
- auto file_contents = getNotFoundFlowFileContents();
- std::unordered_multiset<std::string> expected{""};
- REQUIRE(file_contents == expected);
+ attributes_["filename"] = "non_existent.file";
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not found"));
}
TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), "/tmp/non_existent.file");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound.getName(), "INFO");
- test_controller_.runSession(plan_);
- auto file_contents = getNotFoundFlowFileContents();
- std::unordered_multiset<std::string> expected{""};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not found"));
}
#ifndef WIN32
TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testFetchFile]") {
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied.getName(), "WARN");
- test_controller_.runSession(plan_);
- auto file_contents = getPermissionDeniedFlowFileContents();
- std::unordered_multiset<std::string> expected{""};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission denied for file"));
}
#endif
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
}
@@ -200,29 +132,29 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path",
REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "sub"));
utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), file_path);
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(file_path));
}
TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- REQUIRE_THROWS_AS(test_controller_.runSession(plan_), minifi::Exception);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
- test_controller_.runSession(plan_);
- auto file_contents = getFailedFlowFileContents();
- std::unordered_multiset<std::string> expected{""};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Failure);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
@@ -230,26 +162,26 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[test
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored when completion strategy is not move file", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with replace file", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Replace File");
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -257,33 +189,32 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with renaming file to a new random filename", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Rename");
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
-
auto move_dir_contents = getDirContents(move_dir);
- expected = {"old content", file_content_};
+ std::unordered_multiset<std::string> expected = {"old content", file_content_};
REQUIRE(move_dir_contents == expected);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with deleting the new file and keeping the old one", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Keep Existing");
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -291,13 +222,13 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved wi
}
TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory after flow completion", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ auto move_dir = test_controller_->createTempDirectory();
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -305,14 +236,14 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory
}
TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file is moved to a non-existent directory which is created by the flow", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
@@ -321,14 +252,14 @@ TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file i
#ifndef WIN32
TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to filesystem error still succeeds flow", "[testFetchFile]") {
- auto move_dir = test_controller_.createTempDirectory();
+ auto move_dir = test_controller_->createTempDirectory();
utils::file::FileUtils::set_permissions(move_dir, 0);
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "completion strategy failed"));
@@ -337,11 +268,11 @@ TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to
#endif
TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
- plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Delete File");
- test_controller_.runSession(plan_);
- auto file_contents = getSuccessfulFlowFileContents();
- std::unordered_multiset<std::string> expected{file_content_};
- REQUIRE(file_contents == expected);
+ fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
+ const auto result = test_controller_->trigger("", attributes_);
+ auto file_contents = result.at(minifi::processors::FetchFile::Success);
+ REQUIRE(file_contents.size() == 1);
+ REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
}