You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2017/11/06 16:09:42 UTC
nifi-minifi-cpp git commit: MINIFICPP-289 Support PutFile property:
Create Missing Directories
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master e4ec7337d -> a95e17873
MINIFICPP-289 Support PutFile property: Create Missing Directories
This closes #176.
Signed-off-by: Marc Parisi <ph...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a95e1787
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a95e1787
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a95e1787
Branch: refs/heads/master
Commit: a95e1787366f353b8efaed3bcb9ab09ab3e904f4
Parents: e4ec733
Author: Andrew I. Christianson <an...@andyic.org>
Authored: Fri Nov 3 20:51:17 2017 -0400
Committer: Marc Parisi <ph...@apache.org>
Committed: Mon Nov 6 10:45:49 2017 -0500
----------------------------------------------------------------------
libminifi/include/processors/PutFile.h | 31 +++++---
libminifi/src/processors/PutFile.cpp | 117 +++++++++++++++++++++-------
2 files changed, 108 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a95e1787/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index c6b0a18..9872eb2 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -38,11 +38,11 @@ namespace processors {
class PutFile : public core::Processor {
public:
- static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace";
- static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore";
- static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail";
+ static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace";
+ static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore";
+ static constexpr char const *CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail";
- static constexpr char const* ProcessorName = "PutFile";
+ static constexpr char const *ProcessorName = "PutFile";
// Constructor
/*!
@@ -59,6 +59,7 @@ class PutFile : public core::Processor {
// Supported Properties
static core::Property Directory;
static core::Property ConflictResolution;
+ static core::Property CreateDirs;
// Supported Relationships
static core::Relationship Success;
static core::Relationship Failure;
@@ -78,15 +79,19 @@ class PutFile : public core::Processor {
class ReadCallback : public InputStreamCallback {
public:
- ReadCallback(const std::string &tmpFile, const std::string &destFile);
+ ReadCallback(const std::string &tmp_file,
+ const std::string &dest_file,
+ bool try_mkdirs);
~ReadCallback();
- virtual int64_t process(std::shared_ptr<io::BaseStream> stream);bool commit();
+ virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+ bool commit();
private:
std::shared_ptr<logging::Logger> logger_;
- std::ofstream _tmpFileOs;bool _writeSucceeded = false;
- std::string _tmpFile;
- std::string _destFile;
+ bool write_succeeded_ = false;
+ std::string tmp_file_;
+ std::string dest_file_;
+ bool try_mkdirs_;
};
/**
@@ -101,12 +106,14 @@ class PutFile : public core::Processor {
private:
- // directory
std::string directory_;
- // conflict resolution type.
std::string conflict_resolution_;
+ bool try_mkdirs_ = true;
- bool putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile);
+ bool putFile(core::ProcessSession *session,
+ std::shared_ptr<FlowFileRecord> flowFile,
+ const std::string &tmpFile,
+ const std::string &destFile);
std::shared_ptr<logging::Logger> logger_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a95e1787/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index 6c8ad46..c2789d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -28,6 +28,7 @@
#include <iostream>
#include <memory>
#include <set>
+#include <algorithm>
#include <string>
#include "core/logging/Logger.h"
@@ -54,6 +55,11 @@ core::Property PutFile::ConflictResolution(
"Conflict Resolution Strategy",
"Indicates what should happen when a file with the same name already exists in the output directory",
CONFLICT_RESOLUTION_STRATEGY_FAIL);
+core::Property PutFile::CreateDirs(
+ "Create Missing Directories",
+ "If true, then missing destination directories will be created. "
+ "If false, flowfiles are penalized and sent to failure.",
+ "true");
core::Relationship PutFile::Success(
"success",
@@ -67,6 +73,7 @@ void PutFile::initialize() {
std::set<core::Property> properties;
properties.insert(Directory);
properties.insert(ConflictResolution);
+ properties.insert(CreateDirs);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -83,6 +90,10 @@ void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) {
logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
}
+
+ std::string try_mkdirs_conf;
+ context->getProperty(CreateDirs.getName(), try_mkdirs_conf);
+ utils::StringUtils::StringToBool(try_mkdirs_conf, try_mkdirs_);
}
void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
@@ -158,7 +169,7 @@ bool PutFile::putFile(core::ProcessSession *session,
std::shared_ptr<FlowFileRecord> flowFile,
const std::string &tmpFile,
const std::string &destFile) {
- ReadCallback cb(tmpFile, destFile);
+ ReadCallback cb(tmpFile, destFile, try_mkdirs_);
session->read(flowFile, &cb);
logger_->log_info("Committing %s", destFile);
@@ -171,31 +182,86 @@ bool PutFile::putFile(core::ProcessSession *session,
return false;
}
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
- : _tmpFile(tmpFile),
- _tmpFileOs(tmpFile),
- _destFile(destFile),
+PutFile::ReadCallback::ReadCallback(const std::string &tmp_file,
+ const std::string &dest_file,
+ bool try_mkdirs)
+ : tmp_file_(tmp_file),
+ dest_file_(dest_file),
+ try_mkdirs_(try_mkdirs),
logger_(logging::LoggerFactory<PutFile::ReadCallback>::getLogger()) {
}
// Copy the entire file contents to the temporary file
int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
// Copy file contents into tmp file
- _writeSucceeded = false;
+ write_succeeded_ = false;
+ bool try_mkdirs = false;
size_t size = 0;
uint8_t buffer[1024];
- do {
- int read = stream->read(buffer, 1024);
- if (read < 0) {
- return -1;
+
+ // Attempt writing file. After one failure, try to create parent directories if they don't already exist.
+ // This is done so that a stat syscall of the directory is not required on multiple file writes to a good dir,
+ // which is assumed to be a very common case.
+ while (!write_succeeded_) {
+ std::ofstream tmp_file_os(tmp_file_);
+
+ // Attempt to create directories in file's path
+ std::stringstream dir_path_stream;
+
+ if (try_mkdirs) {
+ size_t i = 0;
+ auto pos = tmp_file_.find('/');
+ while (pos != std::string::npos) {
+ auto dir_path_component = tmp_file_.substr(i, pos - i);
+ dir_path_stream << dir_path_component;
+ auto dir_path = dir_path_stream.str();
+
+ if (!dir_path_component.empty()) {
+ logger_->log_info("Attempting to create directory if it does not already exist: %s", dir_path);
+ mkdir(dir_path.c_str(), 0700);
+ dir_path_stream << '/';
+ }
+
+ i = pos + 1;
+ pos = tmp_file_.find('/', pos + 1);
+ }
}
- if (read == 0) {
- break;
+
+ do {
+ int read = stream->read(buffer, 1024);
+
+ if (read < 0) {
+ return -1;
+ }
+
+ if (read == 0) {
+ break;
+ }
+
+ tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+ size += read;
+ } while (size < stream->getSize());
+
+ tmp_file_os.close();
+
+ if (tmp_file_os) {
+ write_succeeded_ = true;
+ } else {
+ if (try_mkdirs) {
+ // We already tried to create dirs, so give up
+ break;
+ } else {
+ if (try_mkdirs_) {
+ // This write failed; try creating the dir on another attempt
+ try_mkdirs = true;
+ } else {
+ // We've been instructed to not attempt to create dirs, so give up
+ break;
+ }
+ }
}
- _tmpFileOs.write(reinterpret_cast<char *>(buffer), read);
- size += read;
- } while (size < stream->getSize());
- _writeSucceeded = true;
+ }
+
return size;
}
@@ -204,20 +270,18 @@ int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
bool PutFile::ReadCallback::commit() {
bool success = false;
- logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str());
-
- if (_writeSucceeded) {
- _tmpFileOs.close();
+ logger_->log_info("PutFile committing put file operation to %s", dest_file_.c_str());
- if (rename(_tmpFile.c_str(), _destFile.c_str())) {
+ if (write_succeeded_) {
+ if (rename(tmp_file_.c_str(), dest_file_.c_str())) {
logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed",
- _destFile.c_str());
+ dest_file_.c_str());
} else {
success = true;
- logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
+ logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_.c_str());
}
} else {
- logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
+ logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_.c_str());
}
return success;
@@ -225,11 +289,8 @@ bool PutFile::ReadCallback::commit() {
// Clean up resources
PutFile::ReadCallback::~ReadCallback() {
- // Close tmp file
- _tmpFileOs.close();
-
// Clean up tmp file, if necessary
- unlink(_tmpFile.c_str());
+ unlink(tmp_file_.c_str());
}
} /* namespace processors */