You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/11/03 10:55:04 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1651 Add
DefragmentText processor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2bc1389 MINIFICPP-1651 Add DefragmentText processor
2bc1389 is described below
commit 2bc13890e5e8d7219ae0d3655efaafbf9fb8ccfe
Author: Martin Zink <ma...@protonmail.com>
AuthorDate: Wed Nov 3 11:46:14 2021 +0100
MINIFICPP-1651 Add DefragmentText processor
Closes #1188
Signed-off-by: Marton Szasz <sz...@apache.org>
---
PROCESSORS.md | 24 ++
README.md | 2 +-
cmake/BuildTests.cmake | 8 +-
.../features/defragtextflowfiles.feature | 30 ++
.../minifi/processors/DefragmentText.py | 9 +
extensions/libarchive/BinFiles.cpp | 17 --
extensions/libarchive/BinFiles.h | 17 +-
.../processors/DefragmentText.cpp | 335 +++++++++++++++++++++
.../processors/DefragmentText.h | 108 +++++++
.../standard-processors/processors/TailFile.cpp | 8 +-
.../processors/TextFragmentUtils.h | 31 ++
.../processors/UpdateAttribute.cpp | 4 +-
.../processors/UpdateAttribute.h | 4 +-
.../tests/unit/DefragmentTextTests.cpp | 248 +++++++++++++++
libminifi/include/core/FlowFileStore.h | 51 ++++
libminifi/include/utils/StringUtils.h | 9 +
libminifi/src/core/ProcessSession.cpp | 14 +-
libminifi/src/utils/StringUtils.cpp | 10 +
libminifi/test/ReadFromFlowFileTestProcessor.cpp | 67 +++++
libminifi/test/ReadFromFlowFileTestProcessor.h | 65 ++++
libminifi/test/WriteToFlowFileTestProcessor.cpp | 74 +++++
libminifi/test/WriteToFlowFileTestProcessor.h | 61 ++++
.../rocksdb-tests/DBContentRepositoryTests.cpp | 7 +-
.../test/unit/ContentRepositoryDependentTests.h | 53 +++-
libminifi/test/unit/ProcessSessionTests.cpp | 10 +-
libminifi/test/unit/StringUtilsTests.cpp | 23 ++
26 files changed, 1223 insertions(+), 66 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index ebb540c..ae96b13 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -13,6 +13,7 @@
- [ConsumeJournald](#consumejournald)
- [ConsumeKafka](#consumekafka)
- [ConsumeMQTT](#consumemqtt)
+- [DefragmentText](#defragmenttext)
- [DeleteS3Object](#deletes3object)
- [ExecuteProcess](#executeprocess)
- [ExecutePythonProcessor](#executepythonprocessor)
@@ -295,6 +296,29 @@ In the list below, the names of required properties appear in bold. Any other pr
| - | - |
|success|FlowFiles that are sent successfully to the destination are transferred to this relationship|
+## DefragmentText
+
+### Description
+
+DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Pattern**|||A regular expression to match at the start or end of messages.|
+|Pattern Location|Start of Message|Start of Message<br>End of Message|Whether the pattern is located at the start or at the end of the messages.|
+|Max Buffer Age|10 min|<duration> <time unit>|The maximum age of the buffer after which it will be transferred to success when matching Start of Message patterns or to failure when matching End of Message patterns.|
+|Max Buffer Size||<size> <size unit>|The maximum buffer size, if the buffer exceed this, it will be transferred to failure.|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Flowfiles that have been successfully defragmented|
+|failure|Flowfiles that failed the defragmentation process|
+
## DeleteS3Object
diff --git a/README.md b/README.md
index dbc641e..3c6a2a9 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
| Extension Set | Processors |
| ------------- |:-------------|
-| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](PROCESSORS.md#putfile)<br/>[RetryFlowFile](PROCESSOR [...]
+| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 17aa08f..ecb5c70 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -70,7 +70,8 @@ function(createTests testName)
if (Boost_FOUND)
target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}")
endif()
- target_link_libraries(${testName} ${CMAKE_DL_LIBS} ${TEST_BASE_LIB})
+ target_link_libraries(${testName} ${CMAKE_DL_LIBS})
+ target_wholearchive_library(${testName} ${TEST_BASE_LIB})
target_link_libraries(${testName} core-minifi yaml-cpp spdlog Threads::Threads)
if (Boost_FOUND)
target_link_libraries(${testName} ${Boost_SYSTEM_LIBRARY})
@@ -83,7 +84,7 @@ endfunction()
enable_testing(test)
SET(TEST_BASE_LIB test_base)
-add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp" "${TEST_DIR}/RandomServerSocket.cpp" "${TEST_DIR}/KamikazeProcessor.cpp")
+add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp" "${TEST_DIR}/RandomServerSocket.cpp" "${TEST_DIR}/KamikazeProcessor.cpp" "${TEST_DIR}/WriteToFlowFileTestProcessor.cpp" "${TEST_DIR}/ReadFromFlowFileTestProcessor.cpp")
target_link_libraries(${TEST_BASE_LIB} core-minifi)
target_include_directories(${TEST_BASE_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/")
@@ -124,7 +125,8 @@ if(NOT WIN32 AND ENABLE_NANOFI)
target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/standard-processors/processors/")
target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
appendIncludes("${testfilename}")
- target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} ${TEST_BASE_LIB} Threads::Threads)
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} Threads::Threads)
+
target_wholearchive_library(${testfilename} nanofi)
createTests(${testfilename})
diff --git a/docker/test/integration/features/defragtextflowfiles.feature b/docker/test/integration/features/defragtextflowfiles.feature
new file mode 100644
index 0000000..1d20bfc
--- /dev/null
+++ b/docker/test/integration/features/defragtextflowfiles.feature
@@ -0,0 +1,30 @@
+Feature: DefragmentText can defragment fragmented data from TailFile
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario Outline: DefragmentText merges split messages from TailFile
+ Given a TailFile processor with the "File to Tail" property set to "/tmp/input/test_file.log"
+ And the "Initial Start Position" property of the TailFile processor is set to "Beginning of File"
+ And the "Input Delimiter" property of the TailFile processor is set to "%"
+ And a file with filename "test_file.log" and content "<input>" is present in "/tmp/input"
+ And a DefragmentText processor with the "Pattern" property set to "<pattern>"
+ And the "Pattern Location" property of the DefragmentText processor is set to "<pattern location>"
+ And a PutFile processor with the name "SuccessPut" and the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the TailFile processor is connected to the DefragmentText
+ And the "success" relationship of the DefragmentText processor is connected to the SuccessPut
+
+
+ When all instances start up
+ Then flowfiles with these contents are placed in the monitored directory in less than 30 seconds: "<success_flow_files>"
+
+
+ Examples:
+ | input | pattern | pattern location | success_flow_files |
+ | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog% | <[0-9]+> | Start of Message | <1> apple%banana%,<2> foo%bar%baz% |
+ | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog% | <[0-9]+> | End of Message | <1>, apple%banana%<2>, foo%bar%baz%<3> |
+ | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog% | % | Start of Message | <1> apple,%banana,%<2> foo,%bar,%baz,%<3> cat,%dog |
+ | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog% | % | End of Message | <1> apple%,banana%,<2> foo%,bar%,baz%,<3> cat%,dog% |
+ | previous%>>a<< apple%banana%>>b<< foo%bar%baz%>>c<< cat%dog% | >>[a-z]+<< | Start of Message | previous%,>>a<< apple%banana%,>>b<< foo%bar%baz% |
+ | previous%>>a<< apple%banana%>>b<< foo%bar%baz%>>c<< cat%dog% | >>[a-z]+<< | End of Message | previous%>>a<<, apple%banana%>>b<<, foo%bar%baz%>>c<< |
+ | long%message%with%a%single%delimiter%at%the%end%!% | ! | Start of Message | long%message%with%a%single%delimiter%at%the%end% |
+ | long%message%with%a%single%delimiter%at%the%end%!% | ! | End of Message | long%message%with%a%single%delimiter%at%the%end%! |
diff --git a/docker/test/integration/minifi/processors/DefragmentText.py b/docker/test/integration/minifi/processors/DefragmentText.py
new file mode 100644
index 0000000..29ea2e5
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DefragmentText.py
@@ -0,0 +1,9 @@
+from ..core.Processor import Processor
+
+
+class DefragmentText(Processor):
+ def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ super(DefragmentText, self).__init__('DefragmentText',
+ schedule=schedule,
+ properties={'Delimiter': delimiter},
+ auto_terminate=['success', 'failure'])
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index 160b474..a468159 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -350,23 +350,6 @@ void BinFiles::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
file_store_.put(flowFile);
}
-void BinFiles::FlowFileStore::put(const std::shared_ptr<core::FlowFile>& flowFile) {
- {
- std::lock_guard<std::mutex> guard(flow_file_mutex_);
- incoming_files_.emplace(std::move(flowFile));
- }
- has_new_flow_file_.store(true, std::memory_order_release);
-}
-
-std::unordered_set<std::shared_ptr<core::FlowFile>> BinFiles::FlowFileStore::getNewFlowFiles() {
- bool hasNewFlowFiles = true;
- if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
- return {};
- }
- std::lock_guard<std::mutex> guard(flow_file_mutex_);
- return std::move(incoming_files_);
-}
-
std::set<std::shared_ptr<core::Connectable>> BinFiles::getOutGoingConnections(const std::string &relationship) const {
auto result = core::Connectable::getOutGoingConnections(relationship);
if (relationship == Self.getName()) {
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index b2443ee..aa8df05 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -34,6 +34,7 @@
#include "utils/gsl.h"
#include "utils/Id.h"
#include "utils/Export.h"
+#include "core/FlowFileStore.h"
namespace org {
namespace apache {
@@ -265,24 +266,10 @@ class BinFiles : public core::Processor {
BinManager binManager_;
private:
- class FlowFileStore{
- public:
- /**
- * Returns the already-preprocessed FlowFiles that got restored on restart from the FlowFileRepository
- * @return the resurrected persisted FlowFiles
- */
- std::unordered_set<std::shared_ptr<core::FlowFile>> getNewFlowFiles();
- void put(const std::shared_ptr<core::FlowFile>& flowFile);
- private:
- std::atomic_bool has_new_flow_file_{false};
- std::mutex flow_file_mutex_;
- std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
- };
-
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<BinFiles>::getLogger()};
uint32_t batchSize_{1};
uint32_t maxBinCount_{100};
- FlowFileStore file_store_;
+ core::FlowFileStore file_store_;
};
} /* namespace processors */
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp
new file mode 100644
index 0000000..04c6f0e
--- /dev/null
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefragmentText.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "serialization/PayloadSerializer.h"
+#include "TextFragmentUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship DefragmentText::Success("success", "Flowfiles that have been successfully defragmented");
+const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process");
+const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor");
+
+const core::Property DefragmentText::Pattern(
+ core::PropertyBuilder::createProperty("Pattern")
+ ->withDescription("A regular expression to match at the start or end of messages.")
+ ->isRequired(true)->build());
+
+const core::Property DefragmentText::PatternLoc(
+ core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.")
+ ->withAllowableValues(PatternLocation::values())
+ ->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
+
+
+const core::Property DefragmentText::MaxBufferSize(
+ core::PropertyBuilder::createProperty("Max Buffer Size")
+ ->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is <size> <data unit>")
+ ->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
+
+const core::Property DefragmentText::MaxBufferAge(
+ core::PropertyBuilder::createProperty("Max Buffer Age")->
+ withDescription("The maximum age of the buffer after which it will be transferred to success when matching Start of Message patterns or to failure when matching End of Message patterns. "
+ "Expected format is <duration> <time unit>")
+ ->withDefaultValue("10 min")
+ ->build());
+
+void DefragmentText::initialize() {
+ setSupportedRelationships({Success, Failure});
+ setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
+}
+
+void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) {
+ gsl_Expects(context);
+
+ std::string max_buffer_age_str;
+ if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
+ core::TimeUnit unit;
+ uint64_t max_buffer_age;
+ if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
+ buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
+ logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age);
+ }
+ }
+
+ std::string max_buffer_size_str;
+ if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
+ uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue();
+ if (max_buffer_size > 0) {
+ buffer_.setMaxSize(max_buffer_size);
+ logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size);
+ }
+ }
+
+ context->getProperty(PatternLoc.getName(), pattern_location_);
+
+ std::string pattern_str;
+ if (context->getProperty(Pattern.getName(), pattern_str) && !pattern_str.empty()) {
+ pattern_ = std::regex(pattern_str);
+ logger_->log_trace("The Pattern is configured to be %s", pattern_str);
+ } else {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid");
+ }
+}
+
+void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* session) {
+ gsl_Expects(session);
+ auto flowFiles = flow_file_store_.getNewFlowFiles();
+ for (auto& file : flowFiles) {
+ if (file)
+ processNextFragment(session, gsl::not_null(std::move(file)));
+ }
+ {
+ std::shared_ptr<core::FlowFile> original_flow_file = session->get();
+ if (original_flow_file)
+ processNextFragment(session, gsl::not_null(std::move(original_flow_file)));
+ }
+ if (buffer_.maxSizeReached()) {
+ buffer_.flushAndReplace(session, Failure, nullptr);
+ return;
+ }
+ if (buffer_.maxAgeReached()) {
+ if (pattern_location_ == PatternLocation::START_OF_MESSAGE)
+ buffer_.flushAndReplace(session, Success, nullptr);
+ else
+ buffer_.flushAndReplace(session, Failure, nullptr);
+ }
+}
+
+void DefragmentText::processNextFragment(core::ProcessSession *session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment) {
+ if (!buffer_.isCompatible(*next_fragment)) {
+ buffer_.flushAndReplace(session, Failure, nullptr);
+ session->transfer(next_fragment, Failure);
+ return;
+ }
+ std::shared_ptr<core::FlowFile> split_before_last_pattern;
+ std::shared_ptr<core::FlowFile> split_after_last_pattern;
+ bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, split_before_last_pattern,
+ split_after_last_pattern);
+ if (split_before_last_pattern)
+ buffer_.append(session, gsl::not_null(std::move(split_before_last_pattern)));
+ if (found_pattern) {
+ buffer_.flushAndReplace(session, Success, split_after_last_pattern);
+ }
+ session->remove(next_fragment);
+}
+
+
+void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& original_flow_file,
+ const std::shared_ptr<core::FlowFile>& split_before_last_pattern,
+ const std::shared_ptr<core::FlowFile>& split_after_last_pattern,
+ const size_t split_position) const {
+ std::string base_name, post_name, offset_str;
+ if (!original_flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
+ return;
+ if (!original_flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
+ return;
+ if (!original_flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
+ return;
+
+ size_t fragment_offset = std::stoi(offset_str);
+
+ if (split_before_last_pattern) {
+ std::string first_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, split_before_last_pattern->getSize());
+ split_before_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, first_part_name);
+ }
+ if (split_after_last_pattern) {
+ std::string second_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset + split_position, split_after_last_pattern->getSize());
+ split_after_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, second_part_name);
+ split_after_last_pattern->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(fragment_offset + split_position));
+ }
+}
+
+namespace {
+class AppendFlowFileToFlowFile : public OutputStreamCallback {
+ public:
+ explicit AppendFlowFileToFlowFile(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append, PayloadSerializer& serializer)
+ : flow_file_to_append_(flow_file_to_append), serializer_(serializer) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+ return serializer_.serialize(flow_file_to_append_, stream);
+ }
+ private:
+ const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append_;
+ PayloadSerializer& serializer_;
+};
+
+void updateAppendedAttributes(core::FlowFile& buffered_ff) {
+ std::string base_name, post_name, offset_str;
+ if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
+ return;
+ if (!buffered_ff.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
+ return;
+ if (!buffered_ff.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
+ return;
+ size_t fragment_offset = std::stoi(offset_str);
+
+ std::string buffer_new_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, buffered_ff.getSize());
+ buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
+}
+
+struct ReadFlowFileContent : public InputStreamCallback {
+ std::string content;
+
+ int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+ content.resize(stream->size());
+ const auto ret = stream->read(reinterpret_cast<uint8_t *>(content.data()), stream->size());
+ if (io::isError(ret))
+ return -1;
+ return gsl::narrow<int64_t>(ret);
+ }
+};
+
+size_t getSplitPosition(const std::smatch& last_match, DefragmentText::PatternLocation pattern_location) {
+ size_t split_position = last_match.position(0);
+ if (pattern_location == DefragmentText::PatternLocation::END_OF_MESSAGE) {
+ split_position += last_match.length(0);
+ }
+ return split_position;
+}
+
+} // namespace
+
+bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession *session,
+ const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
+ std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+ std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
+ ReadFlowFileContent read_flow_file_content;
+ session->read(original_flow_file, &read_flow_file_content);
+ auto last_regex_match = utils::StringUtils::getLastRegexMatch(read_flow_file_content.content, pattern_);
+ if (!last_regex_match.ready()) {
+ split_before_last_pattern = session->clone(original_flow_file);
+ split_after_last_pattern = nullptr;
+ return false;
+ }
+ auto split_position = getSplitPosition(last_regex_match, pattern_location_);
+ if (split_position != 0) {
+ split_before_last_pattern = session->clone(original_flow_file, 0, split_position);
+ }
+ if (split_position != original_flow_file->getSize()) {
+ split_after_last_pattern = session->clone(original_flow_file, split_position, original_flow_file->getSize() - split_position);
+ }
+ updateAttributesForSplitFiles(*original_flow_file, split_before_last_pattern, split_after_last_pattern, split_position);
+ return true;
+}
+
+void DefragmentText::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
+ if (!flowFile)
+ return;
+ flow_file_store_.put(flowFile);
+}
+
+std::set<std::shared_ptr<core::Connectable>> DefragmentText::getOutGoingConnections(const std::string &relationship) const {
+ auto result = core::Connectable::getOutGoingConnections(relationship);
+ if (relationship == Self.getName()) {
+ result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
+ }
+ return result;
+}
+
+void DefragmentText::Buffer::append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append) {
+ if (empty()) {
+ store(session, flow_file_to_append);
+ return;
+ }
+ auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+ return session->read(ff, cb);
+ };
+ PayloadSerializer serializer(flowFileReader);
+ AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append, serializer);
+ session->add(buffered_flow_file_);
+ session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
+ updateAppendedAttributes(*buffered_flow_file_);
+ session->transfer(buffered_flow_file_, Self);
+
+ session->remove(flow_file_to_append);
+}
+
+bool DefragmentText::Buffer::maxSizeReached() const {
+ return !empty()
+ && max_size_.has_value()
+ && (max_size_.value() < buffered_flow_file_->getSize());
+}
+
+bool DefragmentText::Buffer::maxAgeReached() const {
+ return !empty()
+ && max_age_.has_value()
+ && (creation_time_ + max_age_.value() < std::chrono::steady_clock::now());
+}
+
+void DefragmentText::Buffer::setMaxAge(std::chrono::milliseconds max_age) {
+ max_age_ = max_age;
+}
+
+void DefragmentText::Buffer::setMaxSize(size_t max_size) {
+ max_size_ = max_size;
+}
+
+void DefragmentText::Buffer::flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
+ const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
+ if (!empty()) {
+ session->add(buffered_flow_file_);
+ session->transfer(buffered_flow_file_, relationship);
+ }
+ store(session, new_buffered_flow_file);
+}
+
+void DefragmentText::Buffer::store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
+ buffered_flow_file_ = new_buffered_flow_file;
+ creation_time_ = std::chrono::steady_clock::now();
+ if (!empty()) {
+ session->add(buffered_flow_file_);
+ session->transfer(buffered_flow_file_, Self);
+ }
+}
+
+bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+ if (empty())
+ return true;
+ if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
+ != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
+ return false;
+ }
+ if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
+ != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
+ return false;
+ }
+ std::string current_offset_str, append_offset_str;
+ if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
+ != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
+ return false;
+ }
+ if (!current_offset_str.empty() && !append_offset_str.empty()) {
+ size_t current_offset = std::stoi(current_offset_str);
+ size_t append_offset = std::stoi(append_offset_str);
+ if (current_offset + buffered_flow_file_->getSize() != append_offset)
+ return false;
+ }
+ return true;
+}
+
+REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them");
+
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/DefragmentText.h b/extensions/standard-processors/processors/DefragmentText.h
new file mode 100644
index 0000000..1f52379
--- /dev/null
+++ b/extensions/standard-processors/processors/DefragmentText.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <regex>
+#include <memory>
+#include <string>
+#include <set>
+
+#include "core/Processor.h"
+#include "core/FlowFileStore.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
+#include "serialization/PayloadSerializer.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class DefragmentText : public core::Processor {
+ public:
+ explicit DefragmentText(const std::string& name, const utils::Identifier& uuid = {})
+ : Processor(name, uuid) {
+ }
+ EXTENSIONAPI static const core::Relationship Self;
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ EXTENSIONAPI static const core::Property Pattern;
+ EXTENSIONAPI static const core::Property PatternLoc;
+ EXTENSIONAPI static const core::Property MaxBufferAge;
+ EXTENSIONAPI static const core::Property MaxBufferSize;
+
+ void initialize() override;
+ void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* sessionFactory) override;
+ void onTrigger(core::ProcessContext* context, core::ProcessSession* session) override;
+ void restore(const std::shared_ptr<core::FlowFile>& flowFile) override;
+ std::set<std::shared_ptr<core::Connectable>> getOutGoingConnections(const std::string &relationship) const override;
+
+ SMART_ENUM(PatternLocation,
+ (END_OF_MESSAGE, "End of Message"),
+ (START_OF_MESSAGE, "Start of Message")
+ )
+
+ private:
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
+ protected:
+ class Buffer {
+ public:
+ bool isCompatible(const core::FlowFile& fragment) const;
+ void append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append);
+ bool maxSizeReached() const;
+ bool maxAgeReached() const;
+ void setMaxAge(std::chrono::milliseconds max_age);
+ void setMaxSize(size_t max_size);
+ void flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
+ const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
+
+ bool empty() const { return buffered_flow_file_ == nullptr; }
+
+ private:
+ void store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
+
+ std::shared_ptr<core::FlowFile> buffered_flow_file_;
+ std::chrono::time_point<std::chrono::steady_clock> creation_time_;
+ std::optional<std::chrono::milliseconds> max_age_;
+ std::optional<size_t> max_size_;
+ };
+
+ std::regex pattern_;
+ PatternLocation pattern_location_;
+
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DefragmentText>::getLogger();
+ core::FlowFileStore flow_file_store_;
+ Buffer buffer_;
+
+ void processNextFragment(core::ProcessSession *session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment);
+
+ bool splitFlowFileAtLastPattern(core::ProcessSession *session,
+ const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
+ std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+ std::shared_ptr<core::FlowFile> &split_after_last_pattern) const;
+
+ void updateAttributesForSplitFiles(const core::FlowFile &original_flow_file,
+ const std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+ const std::shared_ptr<core::FlowFile> &split_after_last_pattern,
+ const size_t split_position) const;
+};
+
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index d971f73..d363681 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -40,11 +40,13 @@
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
+#include "TextFragmentUtils.h"
#include "TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -795,11 +797,13 @@ void TailFile::updateFlowFileAttributes(const std::string &full_file_name, const
const std::string &extension,
std::shared_ptr<core::FlowFile> &flow_file) const {
logger_->log_info("TailFile %s for %" PRIu64 " bytes", fileName, flow_file->getSize());
- std::string logName = baseName + "." + std::to_string(state.position_) + "-" +
- std::to_string(state.position_ + flow_file->getSize() - 1) + "." + extension;
+ std::string logName = textfragmentutils::createFileName(baseName, extension, state.position_, flow_file->getSize());
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_);
flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name);
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
+ flow_file->setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
+ flow_file->setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
+ flow_file->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));
}
void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const {
diff --git a/extensions/standard-processors/processors/TextFragmentUtils.h b/extensions/standard-processors/processors/TextFragmentUtils.h
new file mode 100644
index 0000000..ed024b2
--- /dev/null
+++ b/extensions/standard-processors/processors/TextFragmentUtils.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors::textfragmentutils {
+ constexpr const char* BASE_NAME_ATTRIBUTE = "TextFragmentAttribute.base_name";
+ constexpr const char* POST_NAME_ATTRIBUTE = "TextFragmentAttribute.post_name";
+ constexpr const char* OFFSET_ATTRIBUTE = "TextFragmentAttribute.offset";
+
+ inline std::string createFileName(const std::string& base_name, const std::string& post_name, const size_t offset, const size_t size) {
+ return base_name + "." + std::to_string(offset) + "-" + std::to_string(offset + size - 1) + "." + post_name;
+ }
+} // namespace org::apache::nifi::minifi::processors::textfragmentutils
diff --git a/extensions/standard-processors/processors/UpdateAttribute.cpp b/extensions/standard-processors/processors/UpdateAttribute.cpp
index fa05f44..35ee4df 100644
--- a/extensions/standard-processors/processors/UpdateAttribute.cpp
+++ b/extensions/standard-processors/processors/UpdateAttribute.cpp
@@ -32,8 +32,8 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Relationship UpdateAttribute::Success("success", "All files are routed to success");
-core::Relationship UpdateAttribute::Failure("failure", "Failed files are transferred to failure");
+const core::Relationship UpdateAttribute::Success("success", "All files are routed to success");
+const core::Relationship UpdateAttribute::Failure("failure", "Failed files are transferred to failure");
void UpdateAttribute::initialize() {
std::set<core::Property> properties;
diff --git a/extensions/standard-processors/processors/UpdateAttribute.h b/extensions/standard-processors/processors/UpdateAttribute.h
index 3a2c047..e8b0be9 100644
--- a/extensions/standard-processors/processors/UpdateAttribute.h
+++ b/extensions/standard-processors/processors/UpdateAttribute.h
@@ -46,8 +46,8 @@ class UpdateAttribute : public core::Processor {
* Relationships
*/
- static core::Relationship Success;
- static core::Relationship Failure;
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
/**
* NiFi API implementation
diff --git a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
new file mode 100644
index 0000000..9cd32de
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "WriteToFlowFileTestProcessor.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "UpdateAttribute.h"
+#include "DefragmentText.h"
+#include "TextFragmentUtils.h"
+#include "utils/TestUtils.h"
+#include "serialization/PayloadSerializer.h"
+#include "serialization/FlowFileSerializer.h"
+#include "unit/ContentRepositoryDependentTests.h"
+
+using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
+using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
+using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+using DefragmentText = org::apache::nifi::minifi::processors::DefragmentText;
+
+TEST_CASE("DefragmentText Single source tests", "[defragmenttextsinglesource]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+ auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+ auto defrag_text_flow_files = std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", "defrag_text_flow_files"));
+ auto read_from_success_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_success_relationship"));
+ auto read_from_failure_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure_relationship"));
+
+ plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, defrag_text_flow_files);
+
+ plan->addConnection(defrag_text_flow_files, DefragmentText::Success, read_from_success_relationship);
+ plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, read_from_failure_relationship);
+
+ read_from_success_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+ read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+
+ SECTION("Throws on empty pattern") {
+ REQUIRE_THROWS(testController.runSession(plan));
+ }
+
+ SECTION("Throws on invalid pattern") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "\"[a-b][a\"");
+
+ REQUIRE_THROWS(testController.runSession(plan));
+ }
+
+ SECTION("Single line messages starting with pattern") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+
+ write_to_flow_file->setContent("<1> Foo");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ write_to_flow_file->setContent("<2> Bar");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("<1> Foo"));
+ write_to_flow_file->setContent("<3> Baz");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("<2> Bar"));
+ }
+
+ SECTION("Single line messages ending with pattern") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+
+ write_to_flow_file->setContent("Foo <1>");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("Foo <1>"));
+ write_to_flow_file->setContent("Bar <2>");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("Bar <2>"));
+ write_to_flow_file->setContent("Baz <3>");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("Baz <3>"));
+ }
+
+ SECTION("Multiline matching start of messages") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+
+ write_to_flow_file->setContent("apple<1> banana<2> cherry<3> dragon ");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("apple<1> banana<2> cherry"));
+
+ write_to_flow_file->setContent("fruit<4> elderberry<5> fig<6> grapefruit");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("<3> dragon fruit<4> elderberry<5> fig"));
+ }
+
+ SECTION("Multiline matching end of messages") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+
+ write_to_flow_file->setContent("apple<1> banana<2> cherry<3> dragon ");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("apple<1> banana<2> cherry<3>"));
+
+ write_to_flow_file->setContent("fruit<4> elderberry<5> fig<6> grapefruit");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent(" dragon fruit<4> elderberry<5> fig<6>"));
+ }
+
+ SECTION("Timeout test Start of Line") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent("");
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("Message"));
+ }
+
+ SECTION("Timeout test Start of Line") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent("");
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->readFlowFileWithContent("Message"));
+ }
+
+ SECTION("Timeout test Start of Line") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent("");
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ testController.runSession(plan);
+ CHECK(read_from_failure_relationship->readFlowFileWithContent("Message"));
+ }
+
+ SECTION("Timeout test without enough time") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "1 h");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent("");
+ std::this_thread::sleep_for(std::chrono::milliseconds(300));
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+ }
+
+ SECTION("Max Buffer test") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferSize.getName(), "100 B");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent(std::string(150, '*'));
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ CHECK(read_from_failure_relationship->readFlowFileWithContent(std::string("Message").append(std::string(150, '*'))));
+ }
+
+ SECTION("Max Buffer test without overflow") {
+ plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferSize.getName(), "100 MB");
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+
+ write_to_flow_file->setContent("Message");
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+ plan->reset();
+ write_to_flow_file->setContent(std::string(150, '*'));
+ testController.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+ }
+}
+
+TEST_CASE("DefragmentTextInvalidSources", "[defragmenttextinvalidsources]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+ auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+ auto update_ff = std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute", "update_attribute"));
+ auto defrag_text_flow_files = std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", "defrag_text_flow_files"));
+ auto read_from_failure_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure_relationship"));
+
+ plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, update_ff);
+ plan->addConnection(update_ff, UpdateAttribute ::Success, defrag_text_flow_files);
+
+ plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, read_from_failure_relationship);
+ defrag_text_flow_files->setAutoTerminatedRelationships({DefragmentText::Success});
+
+ read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+ plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+ plan->setProperty(update_ff, org::apache::nifi::minifi::processors::textfragmentutils::BASE_NAME_ATTRIBUTE, "${UUID()}", true);
+
+ write_to_flow_file->setContent("Foo <1> Foo");
+ testController.runSession(plan);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+ write_to_flow_file->setContent("Bar <2> Bar");
+ plan->reset();
+ testController.runSession(plan);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 2);
+ CHECK(read_from_failure_relationship->readFlowFileWithContent("<1> Foo"));
+ CHECK(read_from_failure_relationship->readFlowFileWithContent("Bar <2> Bar"));
+}
diff --git a/libminifi/include/core/FlowFileStore.h b/libminifi/include/core/FlowFileStore.h
new file mode 100644
index 0000000..6b261dd
--- /dev/null
+++ b/libminifi/include/core/FlowFileStore.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <unordered_set>
+#include <utility>
+
+#include "FlowFile.h"
+namespace org::apache::nifi::minifi::core {
+
+class FlowFileStore {
+ public:
+ std::unordered_set<std::shared_ptr<FlowFile>> getNewFlowFiles() {
+ bool hasNewFlowFiles = true;
+ if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
+ return {};
+ }
+ std::lock_guard<std::mutex> guard(flow_file_mutex_);
+ return std::move(incoming_files_);
+ }
+
+ void put(const std::shared_ptr<FlowFile>& flowFile) {
+ {
+ std::lock_guard<std::mutex> guard(flow_file_mutex_);
+ incoming_files_.emplace(flowFile);
+ }
+ has_new_flow_file_.store(true, std::memory_order_release);
+ }
+ private:
+ std::atomic_bool has_new_flow_file_{false};
+ std::mutex flow_file_mutex_;
+ std::unordered_set<std::shared_ptr<FlowFile>> incoming_files_;
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index b6148c6..ff8c04c 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -23,6 +23,7 @@
#include <iostream>
#include <map>
#include <optional>
+#include <regex>
#include <sstream>
#include <string>
#include <string_view>
@@ -504,6 +505,14 @@ class StringUtils {
return str;
}
+ /**
+ * Returns the last match of a regular expression within the given string
+ * @param str incoming string
+ * @param pattern the regex to be matched
+ * @return the last valid std::smatch or a default constructed smatch (ready() != true) if no matches have been found
+ */
+ static std::smatch getLastRegexMatch(const std::string& str, const std::regex& pattern);
+
private:
inline static char nibble_to_hex(uint8_t nibble, bool uppercase) {
if (nibble < 10) {
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 82d748d..dcb95f4 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -748,13 +748,13 @@ void ProcessSession::commit() {
}
for (const auto& record : _deletedFlowFiles) {
- if (!record->isDeleted()) {
- continue;
- }
- if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
- // mark for deletion in the flowFileRepository
- record->setStoredToRepository(false);
- }
+ if (!record->isDeleted()) {
+ continue;
+ }
+ if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+ // mark for deletion in the flowFileRepository
+ record->setStoredToRepository(false);
+ }
}
ensureNonNullResourceClaim(connectionQueues);
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index cc2952e..1732886 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -390,6 +390,16 @@ std::string StringUtils::to_base64(const uint8_t* data, size_t length, bool url
return std::string(buf.data(), base64_length);
}
+std::smatch StringUtils::getLastRegexMatch(const std::string& str, const std::regex& pattern) {
+ auto matches = std::sregex_iterator(str.begin(), str.end(), pattern);
+ std::smatch last_match;
+ while (matches != std::sregex_iterator()) {
+ last_match = *matches;
+ matches = std::next(matches);
+ }
+ return last_match;
+}
+
constexpr uint8_t StringUtils::SKIP;
constexpr uint8_t StringUtils::hex_lut[128];
constexpr const char StringUtils::base64_enc_lut[];
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
new file mode 100644
index 0000000..7f41fc1
--- /dev/null
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ReadFromFlowFileTestProcessor.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship ReadFromFlowFileTestProcessor::Success("success", "success operational on the flow record");
+
+void ReadFromFlowFileTestProcessor::initialize() {
+ setSupportedRelationships({ Success });
+}
+
+void ReadFromFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) {
+ logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
+}
+
+namespace {
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+ std::vector<uint8_t> buffer_;
+
+ int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+ size_t bytes_read = stream->read(buffer_, stream->size());
+ return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+ }
+};
+}
+
+void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+ gsl_Expects(context && session);
+ logger_->log_info("%s", ON_TRIGGER_LOG_STR);
+ flow_file_contents_.clear();
+
+ while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
+ ReadFlowFileIntoBuffer callback;
+ session->read(flow_file, &callback);
+ flow_file_contents_.push_back(std::string(callback.buffer_.begin(), callback.buffer_.end()));
+ session->transfer(flow_file, Success);
+ }
+}
+
+void ReadFromFlowFileTestProcessor::onUnSchedule() {
+ logger_->log_info("%s", ON_UNSCHEDULE_LOG_STR);
+}
+
+REGISTER_RESOURCE(ReadFromFlowFileTestProcessor, "ReadFromFlowFileTestProcessor (only for testing purposes)");
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.h b/libminifi/test/ReadFromFlowFileTestProcessor.h
new file mode 100644
index 0000000..455c7e7
--- /dev/null
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <memory>
+#include <vector>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#pragma once
+
+namespace org::apache::nifi::minifi::processors {
+
+class ReadFromFlowFileTestProcessor : public core::Processor {
+ public:
+ static constexpr const char* ON_SCHEDULE_LOG_STR = "ReadFromFlowFileTestProcessor::onSchedule executed";
+ static constexpr const char* ON_TRIGGER_LOG_STR = "ReadFromFlowFileTestProcessor::onTrigger executed";
+ static constexpr const char* ON_UNSCHEDULE_LOG_STR = "ReadFromFlowFileTestProcessor::onUnSchedule executed";
+
+ explicit ReadFromFlowFileTestProcessor(const std::string& name, const utils::Identifier& uuid = utils::Identifier())
+ : Processor(name, uuid) {
+ }
+
+ static constexpr char const* const ProcessorName = "ReadFromFlowFileTestProcessor";
+ static const core::Relationship Success;
+
+ public:
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+ void initialize() override;
+ void onUnSchedule() override;
+
+ bool readFlowFileWithContent(const std::string& content) const {
+ return std::find(flow_file_contents_.begin(), flow_file_contents_.end(), content) != flow_file_contents_.end();
+ }
+
+ size_t numberOfFlowFilesRead() const {
+ return flow_file_contents_.size();
+ }
+
+ private:
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadFromFlowFileTestProcessor>::getLogger();
+ std::vector<std::string> flow_file_contents_;
+};
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.cpp b/libminifi/test/WriteToFlowFileTestProcessor.cpp
new file mode 100644
index 0000000..90a5090
--- /dev/null
+++ b/libminifi/test/WriteToFlowFileTestProcessor.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteToFlowFileTestProcessor.h"
+
+#include <string>
+#include <vector>
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship WriteToFlowFileTestProcessor::Success("success", "success operational on the flow record");
+
+void WriteToFlowFileTestProcessor::initialize() {
+ setSupportedProperties({});
+ setSupportedRelationships({Success});
+}
+
+void WriteToFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) {
+ logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
+}
+
+namespace {
+struct WriteToFlowFileCallback : public OutputStreamCallback {
+ const gsl::span<const uint8_t> content_;
+
+ explicit WriteToFlowFileCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+ size_t bytes_written = stream->write(content_.begin(), content_.size());
+ return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+ }
+};
+} // namespace
+
+void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+ gsl_Expects(context && session);
+ logger_->log_info("%s", ON_TRIGGER_LOG_STR);
+ if (content_.empty()) {
+ context->yield();
+ return;
+ }
+ std::shared_ptr<core::FlowFile> flow_file = session->create();
+ if (!flow_file) {
+ logger_->log_error("Failed to create flowfile!");
+ return;
+ }
+ WriteToFlowFileCallback callback(content_);
+ session->write(flow_file, &callback);
+ session->transfer(flow_file, Success);
+}
+
+void WriteToFlowFileTestProcessor::onUnSchedule() {
+ logger_->log_info("%s", ON_UNSCHEDULE_LOG_STR);
+}
+
+REGISTER_RESOURCE(WriteToFlowFileTestProcessor, "WriteToFlowFileTestProcessor (only for testing purposes)");
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.h b/libminifi/test/WriteToFlowFileTestProcessor.h
new file mode 100644
index 0000000..6d05221
--- /dev/null
+++ b/libminifi/test/WriteToFlowFileTestProcessor.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#pragma once
+
+namespace org::apache::nifi::minifi::processors {
+
+class WriteToFlowFileTestProcessor : public core::Processor {
+ public:
+ static constexpr const char* ON_SCHEDULE_LOG_STR = "WriteToFlowFileTestProcessor::onSchedule executed";
+ static constexpr const char* ON_TRIGGER_LOG_STR = "WriteToFlowFileTestProcessor::onTrigger executed";
+ static constexpr const char* ON_UNSCHEDULE_LOG_STR = "WriteToFlowFileTestProcessor::onUnSchedule executed";
+
+ explicit WriteToFlowFileTestProcessor(const std::string& name, const utils::Identifier& uuid = utils::Identifier())
+ : Processor(name, uuid) {
+ }
+
+ static constexpr char const* const ProcessorName = "WriteToFlowFileTestProcessor";
+ static const core::Relationship Success;
+
+ public:
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+ void initialize() override;
+ void onUnSchedule() override;
+
+ void setContent(std::string content) {
+ content_ = std::move(content);
+ }
+
+ private:
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteToFlowFileTestProcessor>::getLogger();
+ std::string content_;
+};
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index c456380..01b5251 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -229,6 +229,11 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") {
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>());
- ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::DatabaseContentRepository>());
}
+
+TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
+ ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
+
+ ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
+}
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 90def02..581b68e 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -96,19 +96,20 @@ class Fixture {
core::ProcessSession &processSession() { return *process_session_; }
- void commitFlowFile(const std::string& content) {
- const auto original_ff = process_session_->create();
- WriteStringToFlowFile callback(content);
- process_session_->write(original_ff, &callback);
- process_session_->transfer(original_ff, Success);
+ void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
+ process_session_->transfer(flow_file, Success);
process_session_->commit();
}
- void appendAndCommit(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
+ void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) {
+ WriteStringToFlowFile callback(content);
+ process_session_->write(flow_file, &callback);
+ }
+
+ void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
WriteStringToFlowFile callback(content_to_append);
+ process_session_->add(flow_file);
process_session_->append(flow_file, &callback);
- process_session_->transfer(flow_file, Success);
- process_session_->commit();
}
private:
@@ -123,8 +124,9 @@ class Fixture {
void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(content_repo);
core::ProcessSession& process_session = fixture.processSession();
- fixture.commitFlowFile("foobar");
- const auto original_ff = process_session.get();
+ const auto original_ff = process_session.create();
+ fixture.writeToFlowFile(original_ff, "foobar");
+ fixture.transferAndCommit(original_ff);
REQUIRE(original_ff);
auto clone_first_half = process_session.clone(original_ff, 0, 3);
auto clone_second_half = process_session.clone(original_ff, 3, 3);
@@ -149,13 +151,36 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
CHECK(read_until_it_can_callback.value_ == "bar");
}
-void testAppendSize(std::shared_ptr<core::ContentRepository> content_repo) {
+void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
Fixture fixture = Fixture(content_repo);
core::ProcessSession& process_session = fixture.processSession();
- fixture.commitFlowFile("my");
- const auto flow_file = process_session.get();
- fixture.appendAndCommit(flow_file, "foobar");
+ const auto flow_file = process_session.create();
REQUIRE(flow_file);
+
+ fixture.writeToFlowFile(flow_file, "my");
+ fixture.transferAndCommit(flow_file);
+ fixture.appendToFlowFile(flow_file, "foobar");
+ fixture.transferAndCommit(flow_file);
+
+ CHECK(flow_file->getSize() == 8);
+ ReadUntilStreamSize read_until_stream_size_callback;
+ ReadUntilItCan read_until_it_can_callback;
+ process_session.read(flow_file, &read_until_stream_size_callback);
+ process_session.read(flow_file, &read_until_it_can_callback);
+ CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+ CHECK(read_until_it_can_callback.value_ == "myfoobar");
+}
+
+void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
+ Fixture fixture = Fixture(content_repo);
+ core::ProcessSession& process_session = fixture.processSession();
+ const auto flow_file = process_session.create();
+ REQUIRE(flow_file);
+
+ fixture.writeToFlowFile(flow_file, "my");
+ fixture.appendToFlowFile(flow_file, "foobar");
+ fixture.transferAndCommit(flow_file);
+
CHECK(flow_file->getSize() == 8);
ReadUntilStreamSize read_until_stream_size_callback;
ReadUntilItCan read_until_it_can_callback;
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 97775d3..19d21fd 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -100,6 +100,12 @@ TEST_CASE("ProcessSession::rollback penalizes affected flowfiles", "[rollback]")
TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") {
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<minifi::core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<minifi::core::repository::FileSystemRepository>());
- ContentRepositoryDependentTests::testAppendSize(std::make_shared<minifi::core::repository::VolatileContentRepository>());
- ContentRepositoryDependentTests::testAppendSize(std::make_shared<minifi::core::repository::FileSystemRepository>());
+}
+
+TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
+ ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+ ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+
+ ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+ ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
}
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 307050e..85f6292 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -471,3 +471,26 @@ TEST_CASE("StringUtils::removeFramingCharacters works correctly", "[removeFramin
REQUIRE(utils::StringUtils::removeFramingCharacters("\"abba\"", '"') == "abba");
REQUIRE(utils::StringUtils::removeFramingCharacters("\"\"abba\"\"", '"') == "\"abba\"");
}
+
+TEST_CASE("StringUtils::getLastRegexMatch works correctly", "[getLastRegexMatch]") {
+ std::regex pattern("<[0-9]+>");
+ {
+ std::string content = "Foo";
+ auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+ REQUIRE_FALSE(last_match.ready());
+ }
+ {
+ std::string content = "<1> Foo";
+ auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+ REQUIRE(last_match.ready());
+ CHECK(last_match.length(0) == 3);
+ CHECK(last_match.position(0) == 0);
+ }
+ {
+ std::string content = "<1> Foo<2> Bar<3> Baz<10> Qux";
+ auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+ REQUIRE(last_match.ready());
+ CHECK(last_match.length(0) == 4);
+ CHECK(last_match.position(0) == 21);
+ }
+}