You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/04/24 00:14:15 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-797 - Maximum File Count property of PutFile processor is not working properly

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new eb56f1b  MINIFICPP-797 - Maximum File Count property of PutFile processor is not working properly
eb56f1b is described below

commit eb56f1b62d1aaf2f37342f9148b0eb05c92d24d5
Author: Arpad Boda <ab...@hortonworks.com>
AuthorDate: Tue Apr 2 22:34:17 2019 +0200

    MINIFICPP-797 - Maximum File Count property of PutFile processor is not working properly
    
    MINIFICPP-797 - Added test to verify MaxDestFiles attribute
    
    This closes #531.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 libminifi/src/processors/PutFile.cpp | 52 +++++++----------------------
 libminifi/test/TestBase.cpp          | 15 +++++----
 libminifi/test/TestBase.h            | 16 ++++++++-
 libminifi/test/unit/PutFileTests.cpp | 64 ++++++++++++++++++++++++++++++++++++
 4 files changed, 100 insertions(+), 47 deletions(-)

diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index b279010..72ec850 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -138,49 +138,21 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
     if (S_ISDIR(statResult.st_mode)) {
       // it's a directory, count the files
       int64_t ct = 0;
-#ifndef WIN32
-      DIR *myDir = opendir(directory.c_str());
-      if (!myDir) {
-        logger_->log_warn("Could not open %s", directory);
+
+      // Callback, called for each file entry in the listed directory
+      // Return value is used to break (false) or continue (true) listing
+      auto lambda = [&ct, this](const std::string&, const std::string&) -> bool {
+        return ++ct < max_dest_files_;
+      };
+
+      utils::file::FileUtils::list_dir(directory, lambda, logger_, false);
+
+      if (ct >= max_dest_files_) {
+        logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
+                          "configured max number of files", directory, max_dest_files_);
         session->transfer(flowFile, Failure);
         return;
       }
-      struct dirent* entry = nullptr;
-
-      while ((entry = readdir(myDir)) != nullptr) {
-        if ((strcmp(entry->d_name, ".") != 0) && (strcmp(entry->d_name, "..") != 0)) {
-          ct++;
-          if (ct >= max_dest_files_) {
-            logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
-                              "configured max number of files",
-                              directory, max_dest_files_);
-            session->transfer(flowFile, Failure);
-            closedir(myDir);
-            return;
-          }
-        }
-      }
-      closedir(myDir);
-#else
-      HANDLE hFind;
-      WIN32_FIND_DATA FindFileData;
-
-      if ((hFind = FindFirstFile(directory.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) {
-        do {
-          if ((strcmp(FindFileData.cFileName, ".") != 0) && (strcmp(FindFileData.cFileName, "..") != 0)) {
-            ct++;
-            if (ct >= max_dest_files_) {
-              logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
-                  "configured max number of files", directory, max_dest_files_);
-              session->transfer(flowFile, Failure);
-              FindClose(hFind);
-              return;
-            }
-          }
-        }while (FindNextFile(hFind, &FindFileData));
-        FindClose(hFind);
-      }
-#endif
     }
   }
 
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 0d2b92b..43eead4 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -32,7 +32,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s
   stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
 }
 
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
@@ -50,20 +50,23 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
   processor_mapping_[processor->getUUIDStr()] = processor;
 
   if (!linkToPrevious) {
-    termination_ = relationship;
+    termination_ = *(relationships.begin());
   } else {
     std::shared_ptr<core::Processor> last = processor_queue_.back();
 
     if (last == nullptr) {
       last = processor;
-      termination_ = relationship;
+      termination_ = *(relationships.begin());
     }
 
     std::stringstream connection_name;
     connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
     logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
     std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
-    connection->addRelationship(relationship);
+
+    for(const auto& relationship: relationships) {
+      connection->addRelationship(relationship);
+    }
 
     // link the connections so that we can test results at the end for this
     connection->setSource(last);
@@ -93,7 +96,7 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
   return processor;
 }
 
-std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) {
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
   }
@@ -113,7 +116,7 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
 
   processor->setName(name);
 
-  return addProcessor(processor, name, relationship, linkToPrevious);
+  return addProcessor(processor, name, relationships, linkToPrevious);
 }
 
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 01a3222..04aa3b4 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -172,9 +172,19 @@ class TestPlan {
                     const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration);
 
   std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
-                                                core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false);
+                                                core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false) {
+    return addProcessor(processor, name, {relationship}, linkToPrevious);
+  }
 
   std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
+                                                bool linkToPrevious = false) {
+    return addProcessor(processor_name, name, {relationship}, linkToPrevious);
+  }
+
+  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
+                                                const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious = false);
+
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
                                                 bool linkToPrevious = false);
 
   bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
@@ -273,6 +283,10 @@ class TestController {
     }
   }
 
+  const std::shared_ptr<logging::Logger>& getLogger() const {
+    return log.logger_;
+  }
+
   ~TestController() {
     for (auto dir : directories) {
       DIR *created_dir;
diff --git a/libminifi/test/unit/PutFileTests.cpp b/libminifi/test/unit/PutFileTests.cpp
index 13ef509..467ce14 100644
--- a/libminifi/test/unit/PutFileTests.cpp
+++ b/libminifi/test/unit/PutFileTests.cpp
@@ -317,3 +317,67 @@ TEST_CASE("Test generation of temporary write path", "[putfileTmpWritePath]") {
   REQUIRE(processor->tmpWritePath("a/b/c", "").substr(1, strlen("a/b/.c")) == "a/b/.c");
 }
 
+TEST_CASE("PutFileMaxFileCountTest", "[getfileputpfilemaxcount]") {
+  TestController testController;
+
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
+
+  plan->addProcessor("LogAttribute", "logattribute", { core::Relationship("success", "d"), core::Relationship("failure", "d") }, true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  const char *dir = testController.createTempDirectory(format);
+  char format2[] = "/tmp/ft.XXXXXX";
+  const char *putfiledir = testController.createTempDirectory(format2);
+  plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
+  plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
+  plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::MaxDestFiles.getName(), "1");
+
+
+
+  for (int i = 0; i < 2; ++i) {
+    std::stringstream ss;
+    ss << dir << "/" << "tstFile" << i << ".ext";
+    std::fstream file;
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  plan->reset();
+
+  testController.runSession(plan);
+
+  plan->reset();
+
+  testController.runSession(plan);
+
+
+  REQUIRE(LogTestController::getInstance().contains("key:absolute.path value:" + std::string(dir) + "/tstFile0.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Size:8 Offset:0"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
+
+  // Only 1 of the 2 files should make it to the target dir
+  // Non-determistic, so let's just count them
+  int files_in_dir = 0;
+  auto lambda = [&files_in_dir](const std::string&, const std::string&) -> bool {
+    return ++files_in_dir < 2;
+  };
+
+  utils::file::FileUtils::list_dir(putfiledir, lambda, testController.getLogger(), false);
+
+  REQUIRE(files_in_dir == 1);
+
+  REQUIRE(LogTestController::getInstance().contains("which exceeds the configured max number of files"));
+
+  LogTestController::getInstance().reset();
+}