You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/04/24 00:14:15 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-797 - Maximum
File Count property of PutFile processor is not working properly
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new eb56f1b MINIFICPP-797 - Maximum File Count property of PutFile processor is not working properly
eb56f1b is described below
commit eb56f1b62d1aaf2f37342f9148b0eb05c92d24d5
Author: Arpad Boda <ab...@hortonworks.com>
AuthorDate: Tue Apr 2 22:34:17 2019 +0200
MINIFICPP-797 - Maximum File Count property of PutFile processor is not working properly
MINIFICPP-797 - Added test to verify MaxDestFiles attribute
This closes #531.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
libminifi/src/processors/PutFile.cpp | 52 +++++++----------------------
libminifi/test/TestBase.cpp | 15 +++++----
libminifi/test/TestBase.h | 16 ++++++++-
libminifi/test/unit/PutFileTests.cpp | 64 ++++++++++++++++++++++++++++++++++++
4 files changed, 100 insertions(+), 47 deletions(-)
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index b279010..72ec850 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -138,49 +138,21 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
if (S_ISDIR(statResult.st_mode)) {
// it's a directory, count the files
int64_t ct = 0;
-#ifndef WIN32
- DIR *myDir = opendir(directory.c_str());
- if (!myDir) {
- logger_->log_warn("Could not open %s", directory);
+
+ // Callback, called for each file entry in the listed directory
+ // Return value is used to break (false) or continue (true) listing
+ auto lambda = [&ct, this](const std::string&, const std::string&) -> bool {
+ return ++ct < max_dest_files_;
+ };
+
+ utils::file::FileUtils::list_dir(directory, lambda, logger_, false);
+
+ if (ct >= max_dest_files_) {
+ logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
+ "configured max number of files", directory, max_dest_files_);
session->transfer(flowFile, Failure);
return;
}
- struct dirent* entry = nullptr;
-
- while ((entry = readdir(myDir)) != nullptr) {
- if ((strcmp(entry->d_name, ".") != 0) && (strcmp(entry->d_name, "..") != 0)) {
- ct++;
- if (ct >= max_dest_files_) {
- logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
- "configured max number of files",
- directory, max_dest_files_);
- session->transfer(flowFile, Failure);
- closedir(myDir);
- return;
- }
- }
- }
- closedir(myDir);
-#else
- HANDLE hFind;
- WIN32_FIND_DATA FindFileData;
-
- if ((hFind = FindFirstFile(directory.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
- do {
- if ((strcmp(FindFileData.cFileName, ".") != 0) && (strcmp(FindFileData.cFileName, "..") != 0)) {
- ct++;
- if (ct >= max_dest_files_) {
- logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
- "configured max number of files", directory, max_dest_files_);
- session->transfer(flowFile, Failure);
- FindClose(hFind);
- return;
- }
- }
- }while (FindNextFile(hFind, &FindFileData));
- FindClose(hFind);
- }
-#endif
}
}
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 0d2b92b..43eead4 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -32,7 +32,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
}
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
@@ -50,20 +50,23 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
processor_mapping_[processor->getUUIDStr()] = processor;
if (!linkToPrevious) {
- termination_ = relationship;
+ termination_ = *(relationships.begin());
} else {
std::shared_ptr<core::Processor> last = processor_queue_.back();
if (last == nullptr) {
last = processor;
- termination_ = relationship;
+ termination_ = *(relationships.begin());
}
std::stringstream connection_name;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
- connection->addRelationship(relationship);
+
+ for(const auto& relationship: relationships) {
+ connection->addRelationship(relationship);
+ }
// link the connections so that we can test results at the end for this
connection->setSource(last);
@@ -93,7 +96,7 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
return processor;
}
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
if (finalized) {
return nullptr;
}
@@ -113,7 +116,7 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
processor->setName(name);
- return addProcessor(processor, name, relationship, linkToPrevious);
+ return addProcessor(processor, name, relationships, linkToPrevious);
}
bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 01a3222..04aa3b4 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -172,9 +172,19 @@ class TestPlan {
const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration);
std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
- core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false);
+ core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false) {
+ return addProcessor(processor, name, {relationship}, linkToPrevious);
+ }
std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+ bool linkToPrevious = false) {
+ return addProcessor(processor_name, name, {relationship}, linkToPrevious);
+ }
+
+ std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+ const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious = false);
+
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
bool linkToPrevious = false);
bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
@@ -273,6 +283,10 @@ class TestController {
}
}
+ const std::shared_ptr<logging::Logger>& getLogger() const {
+ return log.logger_;
+ }
+
~TestController() {
for (auto dir : directories) {
DIR *created_dir;
diff --git a/libminifi/test/unit/PutFileTests.cpp b/libminifi/test/unit/PutFileTests.cpp
index 13ef509..467ce14 100644
--- a/libminifi/test/unit/PutFileTests.cpp
+++ b/libminifi/test/unit/PutFileTests.cpp
@@ -317,3 +317,67 @@ TEST_CASE("Test generation of temporary write path", "[putfileTmpWritePath]") {
REQUIRE(processor->tmpWritePath("a/b/c", "").substr(1, strlen("a/b/.c")) == "a/b/.c");
}
+TEST_CASE("PutFileMaxFileCountTest", "[getfileputpfilemaxcount]") {
+ TestController testController;
+
+ LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate");
+
+ std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+ plan->addProcessor("LogAttribute", "logattribute", { core::Relationship("success", "d"), core::Relationship("failure", "d") }, true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ const char *dir = testController.createTempDirectory(format);
+ char format2[] = "/tmp/ft.XXXXXX";
+ const char *putfiledir = testController.createTempDirectory(format2);
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
+ plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
+ plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::MaxDestFiles.getName(), "1");
+
+
+
+ for (int i = 0; i < 2; ++i) {
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile" << i << ".ext";
+ std::fstream file;
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ }
+
+ plan->reset();
+
+ testController.runSession(plan);
+
+ plan->reset();
+
+ testController.runSession(plan);
+
+
+ REQUIRE(LogTestController::getInstance().contains("key:absolute.path value:" + std::string(dir) + "/tstFile0.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Size:8 Offset:0"));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
+
+ // Only 1 of the 2 files should make it to the target dir
+ // Non-determistic, so let's just count them
+ int files_in_dir = 0;
+ auto lambda = [&files_in_dir](const std::string&, const std::string&) -> bool {
+ return ++files_in_dir < 2;
+ };
+
+ utils::file::FileUtils::list_dir(putfiledir, lambda, testController.getLogger(), false);
+
+ REQUIRE(files_in_dir == 1);
+
+ REQUIRE(LogTestController::getInstance().contains("which exceeds the configured max number of files"));
+
+ LogTestController::getInstance().reset();
+}