You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:03 UTC

[nifi-minifi-cpp] 04/04: MINIFICPP-1823 Fix absolute.path output attribute in ListFile

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4a98b0f620d4d08adfcfc4b59f1122e205ffc7cc
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu May 5 14:55:33 2022 +0200

    MINIFICPP-1823 Fix absolute.path output attribute in ListFile
    
    Closes #1326
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../features/file_system_operations.feature        | 15 +++++++-
 .../integration/minifi/processors/FetchFile.py     | 25 ++++++++++++
 .../test/integration/minifi/processors/ListFile.py | 25 ++++++++++++
 .../standard-processors/processors/ListFile.cpp    | 45 +++++++++++-----------
 .../standard-processors/processors/ListFile.h      |  1 +
 .../tests/unit/ListFileTests.cpp                   | 15 ++++++--
 6 files changed, 98 insertions(+), 28 deletions(-)

diff --git a/docker/test/integration/features/file_system_operations.feature b/docker/test/integration/features/file_system_operations.feature
index f01922b40..5b157284a 100644
--- a/docker/test/integration/features/file_system_operations.feature
+++ b/docker/test/integration/features/file_system_operations.feature
@@ -1,7 +1,7 @@
-Feature: File system operations are handled by the GetFile and PutFile processors
+Feature: File system operations are handled by the GetFile, PutFile, ListFile and FetchFile processors
   In order to store and access data on the local file system
   As a user of MiNiFi
-  I need to have GetFile and PutFile processors
+  I need to have GetFile, PutFile, ListFile and FetchFile processors
 
   Background:
     Given the content of "/tmp/output" is monitored
@@ -38,3 +38,14 @@ Feature: File system operations are handled by the GetFile and PutFile processor
     And a file with the content "test" is present in "/tmp/input"
     When the MiNiFi instance starts up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
+
+  Scenario: List and fetch files from a directory in a simple flow
+    Given a file with filename "test_file.log" and content "Test message" is present in "/tmp/input"
+    And a file with filename "test_file2.log" and content "Another test message" is present in "/tmp/input"
+    And a ListFile processor with the "Input Directory" property set to "/tmp/input"
+    And a FetchFile processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListFile processor is connected to the FetchFile
+    And the "success" relationship of the FetchFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then two flowfiles with the contents "Test message" and "Another test message" are placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/minifi/processors/FetchFile.py b/docker/test/integration/minifi/processors/FetchFile.py
new file mode 100644
index 000000000..24abf6db3
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class FetchFile(Processor):
+    def __init__(self):
+        super(FetchFile, self).__init__(
+            'FetchFile',
+            schedule={"scheduling strategy": "EVENT_DRIVEN"},
+            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/ListFile.py b/docker/test/integration/minifi/processors/ListFile.py
new file mode 100644
index 000000000..5e0d9e6c6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class ListFile(Processor):
+    def __init__(self, schedule={'scheduling period': '2 sec'}):
+        super(ListFile, self).__init__(
+            'ListFile',
+            schedule=schedule,
+            auto_terminate=['success'])
diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp
index 6387976d3..555e79822 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -142,39 +142,39 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
 }
 
 bool ListFile::fileMatchesFilters(const ListedFile& listed_file) {
-  if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.absolute_path)) {
-    logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.absolute_path);
+  if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.full_file_path)) {
+    logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (file_filter_ && !std::regex_match(listed_file.filename, *file_filter_)) {
-    logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (path_filter_ && listed_file.relative_path != "." && !std::regex_match(listed_file.relative_path, *path_filter_)) {
-    logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.absolute_path);
+    logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.full_file_path);
     return false;
   }
 
   auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified());
   if (minimum_file_age_ && file_age < *minimum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (maximum_file_age_ && file_age > *maximum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) {
-    logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) {
-    logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
@@ -188,32 +188,32 @@ std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& s
   session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, listed_file.relative_path == "." ?
     std::string(".") + utils::file::FileUtils::get_separator() : listed_file.relative_path + utils::file::FileUtils::get_separator());
   session.putAttribute(flow_file, "file.size", std::to_string(listed_file.file_size));
-  if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.absolute_path, "%Y-%m-%dT%H:%M:%SZ")) {
+  if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.full_file_path, "%Y-%m-%dT%H:%M:%SZ")) {
     session.putAttribute(flow_file, "file.lastModifiedTime", *last_modified_str);
   } else {
     session.putAttribute(flow_file, "file.lastModifiedTime", "");
-    logger_->log_warn("Could not get last modification time of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path);
   }
 
-  if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.absolute_path)) {
+  if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.permissions", *permission_string);
   } else {
-    logger_->log_warn("Failed to get permissions of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get permissions of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.permissions", "");
   }
 
-  if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.absolute_path)) {
+  if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.owner", *owner);
   } else {
-    logger_->log_warn("Failed to get owner of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get owner of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.owner", "");
   }
 
 #ifndef WIN32
-  if (auto group = utils::file::FileUtils::get_file_group(listed_file.absolute_path)) {
+  if (auto group = utils::file::FileUtils::get_file_group(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.group", *group);
   } else {
-    logger_->log_warn("Failed to get group of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get group of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.group", "");
   }
 #else
@@ -234,18 +234,19 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
   auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, logger_, recurse_subdirectories_);
   for (const auto& [path, filename] : file_list) {
     ListedFile listed_file;
-    listed_file.absolute_path = (std::filesystem::path(path) / filename).string();
+    listed_file.full_file_path = (std::filesystem::path(path) / filename).string();
+    listed_file.absolute_path = path + utils::file::FileUtils::get_separator();
     if (auto relative_path = utils::file::FileUtils::get_relative_path(path, input_directory_)) {
       listed_file.relative_path = *relative_path;
     } else {
-      logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.absolute_path, input_directory_);
+      logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.full_file_path, input_directory_);
     }
-    listed_file.file_size = utils::file::FileUtils::file_size(listed_file.absolute_path);
+    listed_file.file_size = utils::file::FileUtils::file_size(listed_file.full_file_path);
     listed_file.filename = filename;
-    if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.absolute_path)) {
+    if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.full_file_path)) {
       listed_file.last_modified_time = *last_modified_time;
     } else {
-      logger_->log_error("Could not get last modification time of file '%s'", listed_file.absolute_path);
+      logger_->log_error("Could not get last modification time of file '%s'", listed_file.full_file_path);
       continue;
     }
 
@@ -254,7 +255,7 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
     }
 
     if (stored_listing_state.wasObjectListedAlready(listed_file)) {
-      logger_->log_debug("File '%s' was already listed.", listed_file.absolute_path);
+      logger_->log_debug("File '%s' was already listed.", listed_file.full_file_path);
       continue;
     }
 
diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h
index 83b3d8606..4223126c6 100644
--- a/extensions/standard-processors/processors/ListFile.h
+++ b/extensions/standard-processors/processors/ListFile.h
@@ -71,6 +71,7 @@ class ListFile : public core::Processor {
     std::string absolute_path;
     std::filesystem::file_time_type last_modified_time;
     std::string relative_path;
+    std::string full_file_path;
     uint64_t file_size = 0;
   };
 
diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp
index 06314f53b..cfef64bbf 100644
--- a/extensions/standard-processors/tests/unit/ListFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp
@@ -26,6 +26,7 @@
 #include "processors/ListFile.h"
 #include "utils/TestUtils.h"
 #include "utils/IntegrationTestUtils.h"
+#include "utils/file/PathUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -100,10 +101,16 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files only once with default
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt"));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + empty_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + standard_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + first_sub_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + second_sub_file_abs_path_));
+  std::string file_path;
+  std::string file_name;
+  utils::file::getFileNameAndPath(empty_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(standard_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(first_sub_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(second_sub_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
   REQUIRE(LogTestController::getInstance().countOccurrences(std::string("key:path value:.") + utils::file::FileUtils::get_separator() + "\n") == 2);
   REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:first_subdir") + utils::file::FileUtils::get_separator()));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:second_subdir") + utils::file::FileUtils::get_separator()));