You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/11/06 16:09:42 UTC

nifi-minifi-cpp git commit: MINIFICPP-289 Support PutFile property: Create Missing Directories

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master e4ec7337d -> a95e17873


MINIFICPP-289 Support PutFile property: Create Missing Directories

This closes #176.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a95e1787
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a95e1787
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a95e1787

Branch: refs/heads/master
Commit: a95e1787366f353b8efaed3bcb9ab09ab3e904f4
Parents: e4ec733
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Fri Nov 3 20:51:17 2017 -0400
Committer: Marc Parisi <ph...@apache.org>
Committed: Mon Nov 6 10:45:49 2017 -0500

----------------------------------------------------------------------
 libminifi/include/processors/PutFile.h |  31 +++++---
 libminifi/src/processors/PutFile.cpp   | 117 +++++++++++++++++++++-------
 2 files changed, 108 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a95e1787/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index c6b0a18..9872eb2 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -38,11 +38,11 @@ namespace processors {
 class PutFile : public core::Processor {
  public:
 
-  static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace";
-  static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore";
-  static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail";
+  static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace";
+  static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore";
+  static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail";
 
-  static constexpr char const* ProcessorName = "PutFile";
+  static constexpr char const *ProcessorName = "PutFile";
 
   // Constructor
   /*!
@@ -59,6 +59,7 @@ class PutFile : public core::Processor {
   // Supported Properties
   static core::Property Directory;
   static core::Property ConflictResolution;
+  static core::Property CreateDirs;
   // Supported Relationships
   static core::Relationship Success;
   static core::Relationship Failure;
@@ -78,15 +79,19 @@ class PutFile : public core::Processor {
 
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(const std::string &tmpFile, const std::string &destFile);
+    ReadCallback(const std::string &tmp_file,
+                 const std::string &dest_file,
+                 bool try_mkdirs);
     ~ReadCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);bool commit();
+    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+    bool commit();
 
    private:
     std::shared_ptr<logging::Logger> logger_;
-    std::ofstream _tmpFileOs;bool _writeSucceeded = false;
-    std::string _tmpFile;
-    std::string _destFile;
+    bool write_succeeded_ = false;
+    std::string tmp_file_;
+    std::string dest_file_;
+    bool try_mkdirs_;
   };
 
   /**
@@ -101,12 +106,14 @@ class PutFile : public core::Processor {
 
  private:
 
-  // directory
   std::string directory_;
-  // conflict resolution type.
   std::string conflict_resolution_;
+  bool try_mkdirs_ = true;
 
-  bool putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile);
+  bool putFile(core::ProcessSession *session,
+               std::shared_ptr<FlowFileRecord> flowFile,
+               const std::string &tmpFile,
+               const std::string &destFile);
   std::shared_ptr<logging::Logger> logger_;
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a95e1787/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index 6c8ad46..c2789d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -28,6 +28,7 @@
 #include <iostream>
 #include <memory>
 #include <set>
+#include <algorithm>
 #include <string>
 
 #include "core/logging/Logger.h"
@@ -54,6 +55,11 @@ core::Property PutFile::ConflictResolution(
     "Conflict Resolution Strategy",
     "Indicates what should happen when a file with the same name already exists in the output directory",
     CONFLICT_RESOLUTION_STRATEGY_FAIL);
+core::Property PutFile::CreateDirs(
+    "Create Missing Directories",
+    "If true, then missing destination directories will be created. "
+        "If false, flowfiles are penalized and sent to failure.",
+    "true");
 
 core::Relationship PutFile::Success(
     "success",
@@ -67,6 +73,7 @@ void PutFile::initialize() {
   std::set<core::Property> properties;
   properties.insert(Directory);
   properties.insert(ConflictResolution);
+  properties.insert(CreateDirs);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -83,6 +90,10 @@ void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
   if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) {
     logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
   }
+
+  std::string try_mkdirs_conf;
+  context->getProperty(CreateDirs.getName(), try_mkdirs_conf);
+  utils::StringUtils::StringToBool(try_mkdirs_conf, try_mkdirs_);
 }
 
 void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
@@ -158,7 +169,7 @@ bool PutFile::putFile(core::ProcessSession *session,
                       std::shared_ptr<FlowFileRecord> flowFile,
                       const std::string &tmpFile,
                       const std::string &destFile) {
-  ReadCallback cb(tmpFile, destFile);
+  ReadCallback cb(tmpFile, destFile, try_mkdirs_);
   session->read(flowFile, &cb);
 
   logger_->log_info("Committing %s", destFile);
@@ -171,31 +182,86 @@ bool PutFile::putFile(core::ProcessSession *session,
   return false;
 }
 
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
-    : _tmpFile(tmpFile),
-      _tmpFileOs(tmpFile),
-      _destFile(destFile),
+PutFile::ReadCallback::ReadCallback(const std::string &tmp_file,
+                                    const std::string &dest_file,
+                                    bool try_mkdirs)
+    : tmp_file_(tmp_file),
+      dest_file_(dest_file),
+      try_mkdirs_(try_mkdirs),
       logger_(logging::LoggerFactory<PutFile::ReadCallback>::getLogger()) {
 }
 
 // Copy the entire file contents to the temporary file
 int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
   // Copy file contents into tmp file
-  _writeSucceeded = false;
+  write_succeeded_ = false;
+  bool try_mkdirs = false;
   size_t size = 0;
   uint8_t buffer[1024];
-  do {
-    int read = stream->read(buffer, 1024);
-    if (read < 0) {
-      return -1;
+
+  // Attempt writing file. After one failure, try to create parent directories if they don't already exist.
+  // This is done so that a stat syscall of the directory is not required on multiple file writes to a good dir,
+  // which is assumed to be a very common case.
+  while (!write_succeeded_) {
+    std::ofstream tmp_file_os(tmp_file_);
+
+    // Attempt to create directories in file's path
+    std::stringstream dir_path_stream;
+
+    if (try_mkdirs) {
+      size_t i = 0;
+      auto pos = tmp_file_.find('/');
+      while (pos != std::string::npos) {
+        auto dir_path_component = tmp_file_.substr(i, pos - i);
+        dir_path_stream << dir_path_component;
+        auto dir_path = dir_path_stream.str();
+
+        if (!dir_path_component.empty()) {
+          logger_->log_info("Attempting to create directory if it does not already exist: %s", dir_path);
+          mkdir(dir_path.c_str(), 0700);
+          dir_path_stream << '/';
+        }
+
+        i = pos + 1;
+        pos = tmp_file_.find('/', pos + 1);
+      }
     }
-    if (read == 0) {
-      break;
+
+    do {
+      int read = stream->read(buffer, 1024);
+
+      if (read < 0) {
+        return -1;
+      }
+
+      if (read == 0) {
+        break;
+      }
+
+      tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+      size += read;
+    } while (size < stream->getSize());
+
+    tmp_file_os.close();
+
+    if (tmp_file_os) {
+      write_succeeded_ = true;
+    } else {
+      if (try_mkdirs) {
+        // We already tried to create dirs, so give up
+        break;
+      } else {
+        if (try_mkdirs_) {
+          // This write failed; try creating the dir on another attempt
+          try_mkdirs = true;
+        } else {
+          // We've been instructed to not attempt to create dirs, so give up
+          break;
+        }
+      }
     }
-    _tmpFileOs.write(reinterpret_cast<char *>(buffer), read);
-    size += read;
-  } while (size < stream->getSize());
-  _writeSucceeded = true;
+  }
+
   return size;
 }
 
@@ -204,20 +270,18 @@ int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
 bool PutFile::ReadCallback::commit() {
   bool success = false;
 
-  logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str());
-
-  if (_writeSucceeded) {
-    _tmpFileOs.close();
+  logger_->log_info("PutFile committing put file operation to %s", dest_file_.c_str());
 
-    if (rename(_tmpFile.c_str(), _destFile.c_str())) {
+  if (write_succeeded_) {
+    if (rename(tmp_file_.c_str(), dest_file_.c_str())) {
       logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed",
-                        _destFile.c_str());
+                        dest_file_.c_str());
     } else {
       success = true;
-      logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
+      logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_.c_str());
     }
   } else {
-    logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
+    logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_.c_str());
   }
 
   return success;
@@ -225,11 +289,8 @@ bool PutFile::ReadCallback::commit() {
 
 // Clean up resources
 PutFile::ReadCallback::~ReadCallback() {
-  // Close tmp file
-  _tmpFileOs.close();
-
   // Clean up tmp file, if necessary
-  unlink(_tmpFile.c_str());
+  unlink(tmp_file_.c_str());
 }
 
 } /* namespace processors */