You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/03/28 09:33:14 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1740 Add FetchFile processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 866c3f3  MINIFICPP-1740 Add FetchFile processor
866c3f3 is described below

commit 866c3f36fa19fe6ebc7f9783f8e16ce59de8f958
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Mar 28 11:10:34 2022 +0200

    MINIFICPP-1740 Add FetchFile processor
    
    FetchFile added mostly according to NiFi's implementation. A difference
    between this and NiFi's implementation is that the flow is only
    transferred to failure due to completion strategy failure if it is
    explicitly set to fail (in case of move strategy conflict). In NiFi's
    case it's a bit inconsistent as it could also fail if the move target
    directory could not be created or it has permission problems, but the
    flow is transferred to success in any other completion strategy failure.
    
    Closes #1262
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  29 ++
 README.md                                          |   2 +-
 .../standard-processors/processors/FetchFile.cpp   | 262 ++++++++++++++++
 .../standard-processors/processors/FetchFile.h     |  96 ++++++
 .../tests/unit/FetchFileTests.cpp                  | 348 +++++++++++++++++++++
 libminifi/include/utils/FileReaderCallback.h       |   3 +-
 libminifi/src/utils/FileReaderCallback.cpp         |   4 +-
 7 files changed, 740 insertions(+), 4 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0a3adc1..54caa87 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -24,6 +24,7 @@
 - [ExtractText](#extracttext)
 - [FetchAzureBlobStorage](#fetchazureblobstorage)
 - [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
+- [FetchFile](#fetchfile)
 - [FetchOPCProcessor](#fetchopcprocessor)
 - [FetchS3Object](#fetchs3object)
 - [FetchSFTP](#fetchsftp)
@@ -598,6 +599,34 @@ In the list below, the names of required properties appear in bold. Any other pr
 |success|Files that have been successfully fetched from Azure storage are transferred to this relationship|
 
 
+## FetchFile
+
+### Description
+
+Reads the contents of a file from disk and streams it into the contents of an incoming FlowFile. Once this is done, the file is optionally moved elsewhere or deleted to help keep the file system organized.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|File to Fetch|||The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.<br/>**Supports Expression Language: true**|
+|**Completion Strategy**|None|None<br/>Move File<br/>Delete File|Specifies what to do with the original file on the file system once it has been pulled into MiNiFi|
+|Move Destination Directory|||The directory to move the original file to once it has been fetched from the file system. This property is ignored unless the Completion Strategy is set to "Move File". If the directory does not exist, it will be created.<br/>**Supports Expression Language: true**|
+|**Move Conflict Strategy**|Rename|Rename<br/>Replace File<br/>Keep Existing<br/>Fail|If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, this property specifies how that naming conflict should be resolved|
+|**Log level when file not found**|ERROR|TRACE<br/>DEBUG<br/>INFO<br/>WARN<br/>ERROR<br/>OFF|Log level to use in case the file does not exist when the processor is triggered|
+|**Log level when permission denied**|ERROR|TRACE<br/>DEBUG<br/>INFO<br/>WARN<br/>ERROR<br/>OFF|Log level to use in case agent does not have sufficient permissions to read the file|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.|
+|not.found|Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.|
+|permission.denied|Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.|
+|failure|Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.|
+
+
 ## FetchOPCProcessor
 
 ### Description
diff --git a/README.md b/README.md
index 242c64c..74de4a0 100644
--- a/README.md
+++ b/README.md
@@ -65,7 +65,7 @@ The following table lists the base set of processors.
 
 | Extension Set        | Processors           |
 | ------------- |:-------------|
-| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
+| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PRO [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/extensions/standard-processors/processors/FetchFile.cpp b/extensions/standard-processors/processors/FetchFile.cpp
new file mode 100644
index 0000000..924b540
--- /dev/null
+++ b/extensions/standard-processors/processors/FetchFile.cpp
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  std::string move_destination_dir;
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_dir);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_dir.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path);
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename);
+    std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
+    logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path);
+    std::filesystem::remove(file_to_fetch_path);
+  }
+}
+
+void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (!moveDestinationConflicts(file_name)) {
+    if (!utils::file::FileUtils::exists(move_destination_directory_)) {
+      std::filesystem::create_directories(move_destination_directory_);
+    }
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path, moved_path);
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+    return;
+  }
+
+  executeMoveConflictStrategy(file_to_fetch_path, file_name);
+}
+
+void FetchFile::executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  try {
+    if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
+      processMoveCompletion(file_to_fetch_path, file_name);
+    } else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
+      logger_->log_info("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path);
+      std::filesystem::remove(file_to_fetch_path);
+    }
+  } catch(const std::filesystem::filesystem_error& ex) {
+    logger_->log_warn("Executing completion strategy failed due to filesystem error: %s", ex.what());
+  }
+}
+
+void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  gsl_Expects(context && session);
+  logger_->log_trace("FetchFile onTrigger");
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+  if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
+    logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '" + file_to_fetch_path + "'!");
+    session->transfer(flow_file, NotFound);
+    return;
+  }
+
+  std::string path;
+  std::string file_name;
+  utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+
+  context->getProperty(MoveDestinationDirectory, move_destination_directory_, flow_file);
+  if (moveWouldFailWithDestinationconflict(file_name)) {
+    logger_->log_error("Move destination (%s) conflicts with an already existing file!", move_destination_directory_);
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  try {
+    utils::FileReaderCallback callback(file_to_fetch_path);
+    session->write(flow_file, std::move(callback));
+    logger_->log_debug("Fetching file '%s' successful!", file_to_fetch_path);
+    session->transfer(flow_file, Success);
+  } catch (const utils::FileReaderCallbackIOError& io_error) {
+    if (io_error.error_code == EACCES) {
+      logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '" + file_to_fetch_path + "' to be fetched!");
+      session->transfer(flow_file, PermissionDenied);
+    } else {
+      logger_->log_error("Fetching file '%s' failed! %s", file_to_fetch_path, io_error.what());
+      session->transfer(flow_file, Failure);
+    }
+    return;
+  }
+
+  executeCompletionStrategy(file_to_fetch_path, file_name);
+}
+
+REGISTER_RESOURCE(FetchFile, "Reads the contents of a file from disk and streams it into the contents of an incoming FlowFile. "
+  "Once this is done, the file is optionally moved elsewhere or deleted to help keep the file system organized.");
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/FetchFile.h b/extensions/standard-processors/processors/FetchFile.h
new file mode 100644
index 0000000..3b52805
--- /dev/null
+++ b/extensions/standard-processors/processors/FetchFile.h
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class FetchFile : public core::Processor {
+ public:
+  SMART_ENUM(CompletionStrategyOption,
+    (NONE, "None"),
+    (MOVE_FILE, "Move File"),
+    (DELETE_FILE, "Delete File")
+  )
+
+  SMART_ENUM(MoveConflictStrategyOption,
+    (RENAME, "Rename"),
+    (REPLACE_FILE, "Replace File"),
+    (KEEP_EXISTING, "Keep Existing"),
+    (FAIL, "Fail")
+  )
+
+  SMART_ENUM(LogLevelOption,
+    (LOGGING_TRACE, "TRACE"),
+    (LOGGING_DEBUG, "DEBUG"),
+    (LOGGING_INFO, "INFO"),
+    (LOGGING_WARN, "WARN"),
+    (LOGGING_ERROR, "ERROR"),
+    (LOGGING_OFF, "OFF")
+  )
+
+  explicit FetchFile(const std::string& name, const utils::Identifier& uuid = {})
+    : core::Processor(name, uuid) {
+  }
+
+  EXTENSIONAPI static const core::Property FileToFetch;
+  EXTENSIONAPI static const core::Property CompletionStrategy;
+  EXTENSIONAPI static const core::Property MoveDestinationDirectory;
+  EXTENSIONAPI static const core::Property MoveConflictStrategy;
+  EXTENSIONAPI static const core::Property LogLevelWhenFileNotFound;
+  EXTENSIONAPI static const core::Property LogLevelWhenPermissionDenied;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship NotFound;
+  EXTENSIONAPI static const core::Relationship PermissionDenied;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+ private:
+  std::string getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  void logWithLevel(LogLevelOption log_level, const std::string& message) const;
+  std::string getMoveAbsolutePath(const std::string& file_name) const;
+  bool moveDestinationConflicts(const std::string& file_name) const;
+  bool moveWouldFailWithDestinationconflict(const std::string& file_name) const;
+  void executeMoveConflictStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
+  void processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name);
+  void executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name);
+
+  std::string move_destination_directory_;
+  CompletionStrategyOption completion_strategy_;
+  MoveConflictStrategyOption move_confict_strategy_;
+  LogLevelOption log_level_when_file_not_found_;
+  LogLevelOption log_level_when_permission_denied_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchFile>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
new file mode 100644
index 0000000..dbc1762
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -0,0 +1,348 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "core/Property.h"
+#include "core/Processor.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/FetchFile.h"
+#include "processors/PutFile.h"
+#include "utils/TestUtils.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace {
+
+class FetchFileTestFixture {
+ public:
+  FetchFileTestFixture();
+  ~FetchFileTestFixture();
+  std::unordered_multiset<std::string> getSuccessfulFlowFileContents() const;
+  std::unordered_multiset<std::string> getFailedFlowFileContents() const;
+  std::unordered_multiset<std::string> getNotFoundFlowFileContents() const;
+#ifndef WIN32
+  std::unordered_multiset<std::string> getPermissionDeniedFlowFileContents() const;
+#endif
+
+ protected:
+  std::unordered_multiset<std::string> getDirContents(const std::string& dir_path) const;
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  const std::string input_dir_;
+  const std::string success_output_dir_;
+  const std::string failure_output_dir_;
+  const std::string not_found_output_dir_;
+  const std::string permission_denied_output_dir_;
+  const std::string permission_denied_file_name_;
+  const std::string input_file_name_;
+  const std::string file_content_;
+  std::shared_ptr<core::Processor> fetch_file_processor_;
+  std::shared_ptr<core::Processor> update_attribute_processor_;
+};
+
+FetchFileTestFixture::FetchFileTestFixture()
+  : plan_(test_controller_.createPlan()),
+    input_dir_(test_controller_.createTempDirectory()),
+    success_output_dir_(test_controller_.createTempDirectory()),
+    failure_output_dir_(test_controller_.createTempDirectory()),
+    not_found_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_output_dir_(test_controller_.createTempDirectory()),
+    permission_denied_file_name_("permission_denied.txt"),
+    input_file_name_("test.txt"),
+    file_content_("The quick brown fox jumps over the lazy dog\n")  {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
+  LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
+  REQUIRE(!input_dir_.empty());
+  REQUIRE(!success_output_dir_.empty());
+  REQUIRE(!failure_output_dir_.empty());
+  REQUIRE(!not_found_output_dir_.empty());
+  REQUIRE(!permission_denied_output_dir_.empty());
+
+  auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile");
+  plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B");
+  update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true);
+  plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true);
+  plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true);
+
+  fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true);
+
+  auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile);
+  success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_);
+
+  auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile);
+  failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}});
+  plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_);
+
+  auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}});
+  plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_);
+
+  auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false);
+  plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile);
+  not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}});
+  plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_);
+
+  utils::putFileToDir(input_dir_, input_file_name_, file_content_);
+  utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0);
+#endif
+}
+
+FetchFileTestFixture::~FetchFileTestFixture() {
+#ifndef WIN32
+  utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644);
+#endif
+}
+
+std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
+  std::unordered_multiset<std::string> file_contents;
+
+  auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
+    std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
+    file_contents.insert(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(dir_path, lambda, plan_->getLogger(), false);
+  return file_contents;
+}
+
+std::unordered_multiset<std::string> FetchFileTestFixture::getSuccessfulFlowFileContents() const {
+  return getDirContents(success_output_dir_);
+}
+
+std::unordered_multiset<std::string> FetchFileTestFixture::getFailedFlowFileContents() const {
+  return getDirContents(failure_output_dir_);
+}
+
+std::unordered_multiset<std::string> FetchFileTestFixture::getNotFoundFlowFileContents() const {
+  return getDirContents(not_found_output_dir_);
+}
+
+#ifndef WIN32
+std::unordered_multiset<std::string> FetchFileTestFixture::getPermissionDeniedFlowFileContents() const {
+  return getDirContents(permission_denied_output_dir_);
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
+  plan_->setProperty(update_attribute_processor_, "filename", "non_existent.file", true);
+  test_controller_.runSession(plan_);
+  auto file_contents = getNotFoundFlowFileContents();
+  std::unordered_multiset<std::string> expected{""};
+  REQUIRE(file_contents == expected);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not found"));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), "/tmp/non_existent.file");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound.getName(), "INFO");
+  test_controller_.runSession(plan_);
+  auto file_contents = getNotFoundFlowFileContents();
+  std::unordered_multiset<std::string> expected{""};
+  REQUIRE(file_contents == expected);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(),
+    input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied.getName(), "WARN");
+  test_controller_.runSession(plan_);
+  auto file_contents = getPermissionDeniedFlowFileContents();
+  std::unordered_multiset<std::string> expected{""};
+  REQUIRE(file_contents == expected);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission denied for file"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path", "[testFetchFile]") {
+  REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "sub"));
+  utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
+  auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::FileToFetch.getName(), file_path);
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(utils::file::FileUtils::exists(file_path));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_), minifi::Exception);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
+  test_controller_.runSession(plan_);
+  auto file_contents = getFailedFlowFileContents();
+  std::unordered_multiset<std::string> expected{""};
+  REQUIRE(file_contents == expected);
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
+  REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored when completion strategy is not move file", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Fail");
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with replace file", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Replace File");
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with renaming file to a new random filename", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Rename");
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+
+
+  auto move_dir_contents = getDirContents(move_dir);
+  expected = {"old content", file_content_};
+  REQUIRE(move_dir_contents == expected);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with deleting the new file and keeping the old one", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::putFileToDir(move_dir, input_file_name_, "old content");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy.getName(), "Keep Existing");
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory after flow completion", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
+}
+
+TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file is moved to a non-existent directory which is created by the flow", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+
+  std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
+  REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to filesystem error still succeeds flow", "[testFetchFile]") {
+  auto move_dir = test_controller_.createTempDirectory();
+  utils::file::FileUtils::set_permissions(move_dir, 0);
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Move File");
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory.getName(), move_dir);
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "completion strategy failed"));
+  utils::file::FileUtils::set_permissions(move_dir, 0644);
+}
+#endif
+
+TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
+  plan_->setProperty(fetch_file_processor_, org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy.getName(), "Delete File");
+  test_controller_.runSession(plan_);
+  auto file_contents = getSuccessfulFlowFileContents();
+  std::unordered_multiset<std::string> expected{file_content_};
+  REQUIRE(file_contents == expected);
+  REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
+}
+
+}  // namespace
diff --git a/libminifi/include/utils/FileReaderCallback.h b/libminifi/include/utils/FileReaderCallback.h
index 398bb14..a368222 100644
--- a/libminifi/include/utils/FileReaderCallback.h
+++ b/libminifi/include/utils/FileReaderCallback.h
@@ -45,7 +45,8 @@ class FileReaderCallback : public OutputStreamCallback {
 
 class FileReaderCallbackIOError : public std::runtime_error {
  public:
-  explicit FileReaderCallbackIOError(const std::string& message) : std::runtime_error{message} {}
+  explicit FileReaderCallbackIOError(const std::string& message, int code) : std::runtime_error{message}, error_code(code) {}
+  int error_code;
 };
 
 }  // namespace utils
diff --git a/libminifi/src/utils/FileReaderCallback.cpp b/libminifi/src/utils/FileReaderCallback.cpp
index e656537..ee59586 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/FileReaderCallback.cpp
@@ -40,7 +40,7 @@ FileReaderCallback::FileReaderCallback(const std::string& file_name)
   logger_->log_debug("Opening %s", file_name);
   input_stream_.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
   if (!input_stream_.is_open()) {
-    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)));
+    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)), errno);
   }
 }
 
@@ -51,7 +51,7 @@ int64_t FileReaderCallback::process(const std::shared_ptr<io::BaseStream>& outpu
   while (input_stream_.good()) {
     input_stream_.read(buffer.data(), buffer.size());
     if (input_stream_.bad()) {
-      throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading file: ", std::strerror(errno)));
+      throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading file: ", std::strerror(errno)), errno);
     }
     const auto num_bytes_read = input_stream_.gcount();
     logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});