You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/03/03 11:37:31 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1262: MINIFICPP-1740 Add FetchFile processor

fgerlits commented on a change in pull request #1262:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1262#discussion_r818507187



##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {

Review comment:
       I would call this `moveWouldFailWithDestinationconflict()`, since it doesn't really move anything, just checks if it could.

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file is moved with generated name '%s' by the Move Completion Strategy", generated_filename.c_str());
+    std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
+    logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path.c_str());
+    std::filesystem::remove(file_to_fetch_path);
+  }
+}
+
+void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (!moveDestinationConflicts(file_name)) {
+    if (!utils::file::FileUtils::exists(move_destination_directory_)) {
+      utils::file::FileUtils::create_dir(move_destination_directory_);

Review comment:
       We should either check the return value to see if the operation was successful, or call `std::filesystem::create_directories(move_destination_directory_)` directly, so an exception is thrown in case of an error.  Of course, `rename()` will fail if the directory could not be created, but we would get a better error message this way.

##########
File path: extensions/standard-processors/tests/unit/FetchFileTests.cpp
##########
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+
+#include "TestBase.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+  FetchFileTestFixture();
+  ~FetchFileTestFixture();
+  std::vector<std::string> getSuccessfulFlowFileContents() const;
+  std::vector<std::string> getFailedFlowFileContents() const;
+  std::vector<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+  std::vector<std::string> getPermissionDeniedFlowFileContents() const;
+#endif
+
+ protected:
+  std::vector<std::string> getDirContents(const std::string& dir_path) const;
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  const std::string input_dir_;
+  const std::string success_output_dir_;
+  const std::string failure_output_dir_;
+  const std::string not_found_output_dir_;
+  const std::string permission_denied_output_dir_;
+  const std::string permission_denied_file_name_;
+  const std::string input_file_name_;
+  const std::string file_content_;
+  std::shared_ptr<core::Processor> fetch_file_processor_;
+  std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+  : plan_(test_controller_.createPlan()),
+    input_dir_(test_controller_.createTempDirectory()),
+    success_output_dir_(test_controller_.createTempDirectory()),
+    failure_output_dir_(test_controller_.createTempDirectory()),
+    not_found_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_file_name_("permission_denied.txt"),
+    input_file_name_("test.txt"),
+    file_content_("The quick brown fox jumps over the lazy dog\n")  {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+  LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+  REQUIRE(!input_dir_.empty());
+  REQUIRE(!success_output_dir_.empty());
+  REQUIRE(!failure_output_dir_.empty());
+  REQUIRE(!not_found_output_dir_.empty());
+  REQUIRE(!permission_denied_output_dir_.empty());
+
+  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile");
+  plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B");
+  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true);
+  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true);
+  plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true);
+
+  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true);
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
+
+  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile);
+  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
+
+  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}});
+  plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_);
+
+  auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}});
+  plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_);
+
+  utils::putFileToDir(input_dir_, input_file_name_, file_content_);
+  utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
+#endif
+}
+
+FetchFileTestFixture::~FetchFileTestFixture() {
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
+#endif
+}
+
+std::vector<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {

Review comment:
       If this returned a `multiset` (or `unordered_multiset`), then we could write nicer assertions like
   ```c++
   std::multiset<std::string> expected{"old_content", file_content_};
   CHECK(getSuccessfulFlowFileContents() == expected);
   ```

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.c_str());

Review comment:
       minor, but the `.c_str()`s in the log statements are not necessary: spdlog can add that internally

##########
File path: extensions/standard-processors/tests/unit/FetchFileTests.cpp
##########
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+
+#include "TestBase.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+  FetchFileTestFixture();
+  ~FetchFileTestFixture();
+  std::vector<std::string> getSuccessfulFlowFileContents() const;
+  std::vector<std::string> getFailedFlowFileContents() const;
+  std::vector<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+  std::vector<std::string> getPermissionDeniedFlowFileContents() const;
+#endif
+
+ protected:
+  std::vector<std::string> getDirContents(const std::string& dir_path) const;
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  const std::string input_dir_;
+  const std::string success_output_dir_;
+  const std::string failure_output_dir_;
+  const std::string not_found_output_dir_;
+  const std::string permission_denied_output_dir_;
+  const std::string permission_denied_file_name_;
+  const std::string input_file_name_;
+  const std::string file_content_;
+  std::shared_ptr<core::Processor> fetch_file_processor_;
+  std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+  : plan_(test_controller_.createPlan()),
+    input_dir_(test_controller_.createTempDirectory()),
+    success_output_dir_(test_controller_.createTempDirectory()),
+    failure_output_dir_(test_controller_.createTempDirectory()),
+    not_found_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_file_name_("permission_denied.txt"),
+    input_file_name_("test.txt"),
+    file_content_("The quick brown fox jumps over the lazy dog\n")  {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+  LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+  REQUIRE(!input_dir_.empty());
+  REQUIRE(!success_output_dir_.empty());
+  REQUIRE(!failure_output_dir_.empty());
+  REQUIRE(!not_found_output_dir_.empty());
+  REQUIRE(!permission_denied_output_dir_.empty());
+
+  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile");
+  plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B");
+  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true);
+  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true);
+  plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true);
+
+  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true);
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
+
+  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile);
+  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
+
+  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}});
+  plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_);
+
+  auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}});
+  plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_);
+
+  utils::putFileToDir(input_dir_, input_file_name_, file_content_);
+  utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
+#endif
+}
+
+FetchFileTestFixture::~FetchFileTestFixture() {
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
+#endif
+}
+
+std::vector<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
+  std::vector<std::string> file_contents;
+
+  auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
+    std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+    file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(dir_path, lambda, plan_->getLogger(), false);
+  return file_contents;
+}
+
+std::vector<std::string> FetchFileTestFixture::getSuccessfulFlowFileContents() const {
+  return getDirContents(success_output_dir_);
+}
+
+std::vector<std::string> FetchFileTestFixture::getFailedFlowFileContents() const {
+  return getDirContents(failure_output_dir_);
+}
+
+std::vector<std::string> FetchFileTestFixture::getNotFoundFlowFileContents() const {
+  return getDirContents(not_found_output_dir_);
+}
+
+#ifndef WIN32
+std::vector<std::string> FetchFileTestFixture::getPermissionDeniedFlowFileContents() const {
+  return getDirContents(permission_denied_output_dir_);
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
+  plan_->setProperty(update_attribute_processor_, "filename", "non_existent.file", true);
+  test_controller_.runSession(plan_);
+  auto file_contents = getNotFoundFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0].empty());
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not found"));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), "/tmp/non_existent.file");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound.getName(), "INFO");
+  test_controller_.runSession(plan_);
+  auto file_contents = getNotFoundFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0].empty());
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
+    input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied.getName(), "WARN");
+  test_controller_.runSession(plan_);
+  auto file_contents = getPermissionDeniedFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0].empty());
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission denied for file"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0] == file_content_);
+  REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path", "[testFetchFile]") {
+  REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "sub"));
+  utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
+  auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), file_path);
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0] == file_content_);
+  REQUIRE(utils::file::FileUtils::exists(file_path));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_), minifi::Exception);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
+  test_controller_.runSession(plan_);
+  auto file_contents = getFailedFlowFileContents();
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(file_contents[0].empty());
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
+}

Review comment:
       we could also check that `input_dir_/input_file_name_` is still there

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file is moved with generated name '%s' by the Move Completion Strategy", generated_filename.c_str());

Review comment:
       it could be useful to log the old (conflicting) name, too

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {

Review comment:
       I would call this `executeMoveConflictStrategy()`

##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file is moved with generated name '%s' by the Move Completion Strategy", generated_filename.c_str());
+    std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
+    logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path.c_str());
+    std::filesystem::remove(file_to_fetch_path);
+  }
+}
+
+void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (!moveDestinationConflicts(file_name)) {
+    if (!utils::file::FileUtils::exists(move_destination_directory_)) {
+      utils::file::FileUtils::create_dir(move_destination_directory_);
+    }
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path.c_str(), moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+    return;
+  }
+
+  executeMoveCompletionStrategy(file_to_fetch_path, file_name);
+}
+
+void FetchFile::executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  try {
+    if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
+      processMoveCompletion(file_to_fetch_path, file_name);
+    } else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
+      logger_->log_info("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path.c_str());
+      std::filesystem::remove(file_to_fetch_path);
+    }
+  } catch(const std::filesystem::filesystem_error& ex) {
+    logger_->log_warn("Executing completion strategy failed due to filesystem error: %s", ex.what());
+  }
+}
+
+void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  gsl_Expects(context && session);
+  logger_->log_trace("FetchFile onTrigger");
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+  if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
+    logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '" + file_to_fetch_path + "'!");
+    session->transfer(flow_file, NotFound);
+    return;
+  }
+
+  std::string path;
+  std::string file_name;
+  utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+

Review comment:
       It would be better to read `MoveDestinationDirectory` into `move_destination_directory_` in `onTrigger()`, as well, as otherwise we don't really support expression language (at least not with flow file attributes).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org