You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/03/04 10:14:30 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1262: MINIFICPP-1740 Add FetchFile processor

lordgamez commented on a change in pull request #1262:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1262#discussion_r819439727



##########
File path: extensions/standard-processors/processors/FetchFile.cpp
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FetchFile.h"
+
+#include <errno.h>
+#include <filesystem>
+#include <utility>
+
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/FileReaderCallback.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property FetchFile::FileToFetch(
+    core::PropertyBuilder::createProperty("File to Fetch")
+      ->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")
+      ->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
+      ->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
+      ->withAllowableValues<std::string>(CompletionStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")
+      ->withDescription("The directory to move the original file to once it has been fetched from the file system. "
+                        "This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+const core::Property FetchFile::MoveConflictStrategy(
+    core::PropertyBuilder::createProperty("Move Conflict Strategy")
+      ->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
+                        "this property specifies how that naming conflict should be resolved")
+      ->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
+      ->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenFileNotFound(
+    core::PropertyBuilder::createProperty("Log level when file not found")
+      ->withDescription("Log level to use in case the file does not exist when the processor is triggered")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Property FetchFile::LogLevelWhenPermissionDenied(
+    core::PropertyBuilder::createProperty("Log level when permission denied")
+      ->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
+      ->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
+      ->withAllowableValues<std::string>(LogLevelOption::values())
+      ->isRequired(true)
+      ->build());
+
+const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
+const core::Relationship FetchFile::NotFound(
+  "not.found",
+  "Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
+const core::Relationship FetchFile::PermissionDenied(
+  "permission.denied",
+  "Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
+const core::Relationship FetchFile::Failure(
+  "failure",
+  "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
+
+void FetchFile::initialize() {
+  setSupportedProperties({
+    FileToFetch,
+    CompletionStrategy,
+    MoveDestinationDirectory,
+    MoveConflictStrategy,
+    LogLevelWhenFileNotFound,
+    LogLevelWhenPermissionDenied
+  });
+
+  setSupportedRelationships({
+    Success,
+    NotFound,
+    PermissionDenied,
+    Failure
+  });
+}
+
+void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+  gsl_Expects(context);
+  completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
+  context->getProperty(MoveDestinationDirectory.getName(), move_destination_directory_);
+  if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_directory_.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
+  }
+  move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
+  log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
+  log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
+}
+
+std::string FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  std::string file_to_fetch_path;
+  context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
+  if (!file_to_fetch_path.empty()) {
+    return file_to_fetch_path;
+  }
+
+  flow_file->getAttribute("absolute.path", file_to_fetch_path);
+  std::string filename;
+  flow_file->getAttribute("filename", filename);
+  file_to_fetch_path += utils::file::FileUtils::get_separator() + filename;
+  return file_to_fetch_path;
+}
+
+void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const {
+  switch (log_level.value()) {
+    case LogLevelOption::LOGGING_TRACE:
+      logger_->log_trace(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_DEBUG:
+      logger_->log_debug(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_INFO:
+      logger_->log_info(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_WARN:
+      logger_->log_warn(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_ERROR:
+      logger_->log_error(message.c_str());
+      break;
+    case LogLevelOption::LOGGING_OFF:
+    default:
+      break;
+  }
+}
+
+std::string FetchFile::getMoveAbsolutePath(const std::string& file_name) const {
+  return move_destination_directory_ + utils::file::FileUtils::get_separator() + file_name;
+}
+
+bool FetchFile::moveDestinationConflicts(const std::string& file_name) const {
+  return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
+}
+
+bool FetchFile::moveFailsWithDestinationconflict(const std::string& file_name) const {
+  if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
+    return false;
+  }
+
+  return moveDestinationConflicts(file_name);
+}
+
+void FetchFile::executeMoveCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
+    auto generated_filename = utils::IdGenerator::getIdGenerator()->generate().to_string();
+    logger_->log_info("Due to conflict file is moved with generated name '%s' by the Move Completion Strategy", generated_filename.c_str());
+    std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
+  } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
+    logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path.c_str());
+    std::filesystem::remove(file_to_fetch_path);
+  }
+}
+
+void FetchFile::processMoveCompletion(const std::string& file_to_fetch_path, const std::string& file_name) {
+  if (!moveDestinationConflicts(file_name)) {
+    if (!utils::file::FileUtils::exists(move_destination_directory_)) {
+      utils::file::FileUtils::create_dir(move_destination_directory_);
+    }
+    auto moved_path = getMoveAbsolutePath(file_name);
+    logger_->log_info("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path.c_str(), moved_path.c_str());
+    std::filesystem::rename(file_to_fetch_path, moved_path);
+    return;
+  }
+
+  executeMoveCompletionStrategy(file_to_fetch_path, file_name);
+}
+
+void FetchFile::executeCompletionStrategy(const std::string& file_to_fetch_path, const std::string& file_name) {
+  try {
+    if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
+      processMoveCompletion(file_to_fetch_path, file_name);
+    } else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
+      logger_->log_info("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path.c_str());
+      std::filesystem::remove(file_to_fetch_path);
+    }
+  } catch(const std::filesystem::filesystem_error& ex) {
+    logger_->log_warn("Executing completion strategy failed due to filesystem error: %s", ex.what());
+  }
+}
+
+void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  gsl_Expects(context && session);
+  logger_->log_trace("FetchFile onTrigger");
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
+  if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
+    logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '" + file_to_fetch_path + "'!");
+    session->transfer(flow_file, NotFound);
+    return;
+  }
+
+  std::string path;
+  std::string file_name;
+  utils::file::getFileNameAndPath(file_to_fetch_path, path, file_name);
+

Review comment:
       Good catch, I missed that, fixed in 5cc8e67a59aa66ed854bdb3349b241c549357c4a




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org