You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:21:03 UTC
[nifi-minifi-cpp] 04/04: MINIFICPP-1823 Fix absolute.path output attribute in ListFile
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4a98b0f620d4d08adfcfc4b59f1122e205ffc7cc
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu May 5 14:55:33 2022 +0200
MINIFICPP-1823 Fix absolute.path output attribute in ListFile
Closes #1326
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../features/file_system_operations.feature | 15 +++++++-
.../integration/minifi/processors/FetchFile.py | 25 ++++++++++++
.../test/integration/minifi/processors/ListFile.py | 25 ++++++++++++
.../standard-processors/processors/ListFile.cpp | 45 +++++++++++-----------
.../standard-processors/processors/ListFile.h | 1 +
.../tests/unit/ListFileTests.cpp | 15 ++++++--
6 files changed, 98 insertions(+), 28 deletions(-)
diff --git a/docker/test/integration/features/file_system_operations.feature b/docker/test/integration/features/file_system_operations.feature
index f01922b40..5b157284a 100644
--- a/docker/test/integration/features/file_system_operations.feature
+++ b/docker/test/integration/features/file_system_operations.feature
@@ -1,7 +1,7 @@
-Feature: File system operations are handled by the GetFile and PutFile processors
+Feature: File system operations are handled by the GetFile, PutFile, ListFile and FetchFile processors
In order to store and access data on the local file system
As a user of MiNiFi
- I need to have GetFile and PutFile processors
+ I need to have GetFile, PutFile, ListFile and FetchFile processors
Background:
Given the content of "/tmp/output" is monitored
@@ -38,3 +38,14 @@ Feature: File system operations are handled by the GetFile and PutFile processor
And a file with the content "test" is present in "/tmp/input"
When the MiNiFi instance starts up
Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
+
+ Scenario: List and fetch files from a directory in a simple flow
+ Given a file with filename "test_file.log" and content "Test message" is present in "/tmp/input"
+ And a file with filename "test_file2.log" and content "Another test message" is present in "/tmp/input"
+ And a ListFile processor with the "Input Directory" property set to "/tmp/input"
+ And a FetchFile processor
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the ListFile processor is connected to the FetchFile
+ And the "success" relationship of the FetchFile processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then two flowfiles with the contents "Test message" and "Another test message" are placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/minifi/processors/FetchFile.py b/docker/test/integration/minifi/processors/FetchFile.py
new file mode 100644
index 000000000..24abf6db3
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class FetchFile(Processor):
+ def __init__(self):
+ super(FetchFile, self).__init__(
+ 'FetchFile',
+ schedule={"scheduling strategy": "EVENT_DRIVEN"},
+ auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/ListFile.py b/docker/test/integration/minifi/processors/ListFile.py
new file mode 100644
index 000000000..5e0d9e6c6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class ListFile(Processor):
+ def __init__(self, schedule={'scheduling period': '2 sec'}):
+ super(ListFile, self).__init__(
+ 'ListFile',
+ schedule=schedule,
+ auto_terminate=['success'])
diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp
index 6387976d3..555e79822 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -142,39 +142,39 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
}
bool ListFile::fileMatchesFilters(const ListedFile& listed_file) {
- if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.absolute_path)) {
- logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.absolute_path);
+ if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.full_file_path)) {
+ logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.full_file_path);
return false;
}
if (file_filter_ && !std::regex_match(listed_file.filename, *file_filter_)) {
- logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.absolute_path);
+ logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path);
return false;
}
if (path_filter_ && listed_file.relative_path != "." && !std::regex_match(listed_file.relative_path, *path_filter_)) {
- logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.absolute_path);
+ logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.full_file_path);
return false;
}
auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified());
if (minimum_file_age_ && file_age < *minimum_file_age_) {
- logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.absolute_path);
+ logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path);
return false;
}
if (maximum_file_age_ && file_age > *maximum_file_age_) {
- logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.absolute_path);
+ logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path);
return false;
}
if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) {
- logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.absolute_path);
+ logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path);
return false;
}
if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) {
- logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.absolute_path);
+ logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path);
return false;
}
@@ -188,32 +188,32 @@ std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& s
session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, listed_file.relative_path == "." ?
std::string(".") + utils::file::FileUtils::get_separator() : listed_file.relative_path + utils::file::FileUtils::get_separator());
session.putAttribute(flow_file, "file.size", std::to_string(listed_file.file_size));
- if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.absolute_path, "%Y-%m-%dT%H:%M:%SZ")) {
+ if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.full_file_path, "%Y-%m-%dT%H:%M:%SZ")) {
session.putAttribute(flow_file, "file.lastModifiedTime", *last_modified_str);
} else {
session.putAttribute(flow_file, "file.lastModifiedTime", "");
- logger_->log_warn("Could not get last modification time of file '%s'", listed_file.absolute_path);
+ logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path);
}
- if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.absolute_path)) {
+ if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) {
session.putAttribute(flow_file, "file.permissions", *permission_string);
} else {
- logger_->log_warn("Failed to get permissions of file '%s'", listed_file.absolute_path);
+ logger_->log_warn("Failed to get permissions of file '%s'", listed_file.full_file_path);
session.putAttribute(flow_file, "file.permissions", "");
}
- if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.absolute_path)) {
+ if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.full_file_path)) {
session.putAttribute(flow_file, "file.owner", *owner);
} else {
- logger_->log_warn("Failed to get owner of file '%s'", listed_file.absolute_path);
+ logger_->log_warn("Failed to get owner of file '%s'", listed_file.full_file_path);
session.putAttribute(flow_file, "file.owner", "");
}
#ifndef WIN32
- if (auto group = utils::file::FileUtils::get_file_group(listed_file.absolute_path)) {
+ if (auto group = utils::file::FileUtils::get_file_group(listed_file.full_file_path)) {
session.putAttribute(flow_file, "file.group", *group);
} else {
- logger_->log_warn("Failed to get group of file '%s'", listed_file.absolute_path);
+ logger_->log_warn("Failed to get group of file '%s'", listed_file.full_file_path);
session.putAttribute(flow_file, "file.group", "");
}
#else
@@ -234,18 +234,19 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, logger_, recurse_subdirectories_);
for (const auto& [path, filename] : file_list) {
ListedFile listed_file;
- listed_file.absolute_path = (std::filesystem::path(path) / filename).string();
+ listed_file.full_file_path = (std::filesystem::path(path) / filename).string();
+ listed_file.absolute_path = path + utils::file::FileUtils::get_separator();
if (auto relative_path = utils::file::FileUtils::get_relative_path(path, input_directory_)) {
listed_file.relative_path = *relative_path;
} else {
- logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.absolute_path, input_directory_);
+ logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.full_file_path, input_directory_);
}
- listed_file.file_size = utils::file::FileUtils::file_size(listed_file.absolute_path);
+ listed_file.file_size = utils::file::FileUtils::file_size(listed_file.full_file_path);
listed_file.filename = filename;
- if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.absolute_path)) {
+ if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.full_file_path)) {
listed_file.last_modified_time = *last_modified_time;
} else {
- logger_->log_error("Could not get last modification time of file '%s'", listed_file.absolute_path);
+ logger_->log_error("Could not get last modification time of file '%s'", listed_file.full_file_path);
continue;
}
@@ -254,7 +255,7 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
}
if (stored_listing_state.wasObjectListedAlready(listed_file)) {
- logger_->log_debug("File '%s' was already listed.", listed_file.absolute_path);
+ logger_->log_debug("File '%s' was already listed.", listed_file.full_file_path);
continue;
}
diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h
index 83b3d8606..4223126c6 100644
--- a/extensions/standard-processors/processors/ListFile.h
+++ b/extensions/standard-processors/processors/ListFile.h
@@ -71,6 +71,7 @@ class ListFile : public core::Processor {
std::string absolute_path;
std::filesystem::file_time_type last_modified_time;
std::string relative_path;
+ std::string full_file_path;
uint64_t file_size = 0;
};
diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp
index 06314f53b..cfef64bbf 100644
--- a/extensions/standard-processors/tests/unit/ListFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp
@@ -26,6 +26,7 @@
#include "processors/ListFile.h"
#include "utils/TestUtils.h"
#include "utils/IntegrationTestUtils.h"
+#include "utils/file/PathUtils.h"
using namespace std::literals::chrono_literals;
@@ -100,10 +101,16 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files only once with default
REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt"));
REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt"));
REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt"));
- REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + empty_file_abs_path_));
- REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + standard_file_abs_path_));
- REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + first_sub_file_abs_path_));
- REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + second_sub_file_abs_path_));
+ std::string file_path;
+ std::string file_name;
+ utils::file::getFileNameAndPath(empty_file_abs_path_, file_path, file_name);
+ REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+ utils::file::getFileNameAndPath(standard_file_abs_path_, file_path, file_name);
+ REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+ utils::file::getFileNameAndPath(first_sub_file_abs_path_, file_path, file_name);
+ REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+ utils::file::getFileNameAndPath(second_sub_file_abs_path_, file_path, file_name);
+ REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
REQUIRE(LogTestController::getInstance().countOccurrences(std::string("key:path value:.") + utils::file::FileUtils::get_separator() + "\n") == 2);
REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:first_subdir") + utils::file::FileUtils::get_separator()));
REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:second_subdir") + utils::file::FileUtils::get_separator()));