You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/11/03 10:55:04 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1651 Add DefragmentText processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bc1389  MINIFICPP-1651 Add DefragmentText processor
2bc1389 is described below

commit 2bc13890e5e8d7219ae0d3655efaafbf9fb8ccfe
Author: Martin Zink <ma...@protonmail.com>
AuthorDate: Wed Nov 3 11:46:14 2021 +0100

    MINIFICPP-1651 Add DefragmentText processor
    
    Closes #1188
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  24 ++
 README.md                                          |   2 +-
 cmake/BuildTests.cmake                             |   8 +-
 .../features/defragtextflowfiles.feature           |  30 ++
 .../minifi/processors/DefragmentText.py            |   9 +
 extensions/libarchive/BinFiles.cpp                 |  17 --
 extensions/libarchive/BinFiles.h                   |  17 +-
 .../processors/DefragmentText.cpp                  | 335 +++++++++++++++++++++
 .../processors/DefragmentText.h                    | 108 +++++++
 .../standard-processors/processors/TailFile.cpp    |   8 +-
 .../processors/TextFragmentUtils.h                 |  31 ++
 .../processors/UpdateAttribute.cpp                 |   4 +-
 .../processors/UpdateAttribute.h                   |   4 +-
 .../tests/unit/DefragmentTextTests.cpp             | 248 +++++++++++++++
 libminifi/include/core/FlowFileStore.h             |  51 ++++
 libminifi/include/utils/StringUtils.h              |   9 +
 libminifi/src/core/ProcessSession.cpp              |  14 +-
 libminifi/src/utils/StringUtils.cpp                |  10 +
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   |  67 +++++
 libminifi/test/ReadFromFlowFileTestProcessor.h     |  65 ++++
 libminifi/test/WriteToFlowFileTestProcessor.cpp    |  74 +++++
 libminifi/test/WriteToFlowFileTestProcessor.h      |  61 ++++
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |   7 +-
 .../test/unit/ContentRepositoryDependentTests.h    |  53 +++-
 libminifi/test/unit/ProcessSessionTests.cpp        |  10 +-
 libminifi/test/unit/StringUtilsTests.cpp           |  23 ++
 26 files changed, 1223 insertions(+), 66 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index ebb540c..ae96b13 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -13,6 +13,7 @@
 - [ConsumeJournald](#consumejournald)
 - [ConsumeKafka](#consumekafka)
 - [ConsumeMQTT](#consumemqtt)
+- [DefragmentText](#defragmenttext)
 - [DeleteS3Object](#deletes3object)
 - [ExecuteProcess](#executeprocess)
 - [ExecutePythonProcessor](#executepythonprocessor)
@@ -295,6 +296,29 @@ In the list below, the names of required properties appear in bold. Any other pr
 | - | - |
 |success|FlowFiles that are sent successfully to the destination are transferred to this relationship|
 
+## DefragmentText
+
+### Description
+
+DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Pattern**|||A regular expression to match at the start or end of messages.|
+|Pattern Location|Start of Message|Start of Message<br>End of Message|Whether the pattern is located at the start or at the end of the messages.|
+|Max Buffer Age|10 min|&lt;duration&gt; &lt;time unit&gt;|The maximum age of the buffer after which it will be transferred to success when matching Start of Message patterns or to failure when matching End of Message patterns.|
+|Max Buffer Size||&lt;size&gt; &lt;size unit&gt;|The maximum buffer size, if the buffer exceed this, it will be transferred to failure.|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Flowfiles that have been successfully defragmented|
+|failure|Flowfiles that failed the defragmentation process|
+
 
 ## DeleteS3Object
 
diff --git a/README.md b/README.md
index dbc641e..3c6a2a9 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
 
 | Extension Set        | Processors           |
 | ------------- |:-------------|
-| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](PROCESSORS.md#putfile)<br/>[RetryFlowFile](PROCESSOR [...]
+| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 17aa08f..ecb5c70 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -70,7 +70,8 @@ function(createTests testName)
   if (Boost_FOUND)
     target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}")
   endif()
-  target_link_libraries(${testName} ${CMAKE_DL_LIBS} ${TEST_BASE_LIB})
+  target_link_libraries(${testName} ${CMAKE_DL_LIBS})
+  target_wholearchive_library(${testName} ${TEST_BASE_LIB})
   target_link_libraries(${testName} core-minifi yaml-cpp spdlog Threads::Threads)
   if (Boost_FOUND)
       target_link_libraries(${testName} ${Boost_SYSTEM_LIBRARY})
@@ -83,7 +84,7 @@ endfunction()
 enable_testing(test)
 
 SET(TEST_BASE_LIB test_base)
-add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp" "${TEST_DIR}/RandomServerSocket.cpp" "${TEST_DIR}/KamikazeProcessor.cpp")
+add_library(${TEST_BASE_LIB} STATIC "${TEST_DIR}/TestBase.cpp" "${TEST_DIR}/RandomServerSocket.cpp" "${TEST_DIR}/KamikazeProcessor.cpp" "${TEST_DIR}/WriteToFlowFileTestProcessor.cpp" "${TEST_DIR}/ReadFromFlowFileTestProcessor.cpp")
 target_link_libraries(${TEST_BASE_LIB} core-minifi)
 target_include_directories(${TEST_BASE_LIB} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
 target_include_directories(${TEST_BASE_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/")
@@ -124,7 +125,8 @@ if(NOT WIN32 AND ENABLE_NANOFI)
     target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/standard-processors/processors/")
     target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
     appendIncludes("${testfilename}")
-    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} ${TEST_BASE_LIB} Threads::Threads)
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} Threads::Threads)
+
     target_wholearchive_library(${testfilename} nanofi)
 
     createTests(${testfilename})
diff --git a/docker/test/integration/features/defragtextflowfiles.feature b/docker/test/integration/features/defragtextflowfiles.feature
new file mode 100644
index 0000000..1d20bfc
--- /dev/null
+++ b/docker/test/integration/features/defragtextflowfiles.feature
@@ -0,0 +1,30 @@
+Feature: DefragmentText can defragment fragmented data from TailFile
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario Outline: DefragmentText merges split messages from TailFile
+    Given a TailFile processor with the "File to Tail" property set to "/tmp/input/test_file.log"
+    And the "Initial Start Position" property of the TailFile processor is set to "Beginning of File"
+    And the "Input Delimiter" property of the TailFile processor is set to "%"
+    And a file with filename "test_file.log" and content "<input>" is present in "/tmp/input"
+    And a DefragmentText processor with the "Pattern" property set to "<pattern>"
+    And the "Pattern Location" property of the DefragmentText processor is set to "<pattern location>"
+    And a PutFile processor with the name "SuccessPut" and the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the TailFile processor is connected to the DefragmentText
+    And the "success" relationship of the DefragmentText processor is connected to the SuccessPut
+
+
+    When all instances start up
+    Then flowfiles with these contents are placed in the monitored directory in less than 30 seconds: "<success_flow_files>"
+
+
+    Examples:
+      | input                                                        | pattern       | pattern location |  success_flow_files                                   |
+      | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog%                | <[0-9]+>      | Start of Message | <1> apple%banana%,<2> foo%bar%baz%                    |
+      | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog%                | <[0-9]+>      | End of Message   | <1>, apple%banana%<2>, foo%bar%baz%<3>                |
+      | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog%                | %             | Start of Message | <1> apple,%banana,%<2> foo,%bar,%baz,%<3> cat,%dog    |
+      | <1> apple%banana%<2> foo%bar%baz%<3> cat%dog%                | %             | End of Message   | <1> apple%,banana%,<2> foo%,bar%,baz%,<3> cat%,dog%   |
+      | previous%>>a<< apple%banana%>>b<< foo%bar%baz%>>c<< cat%dog% | >>[a-z]+<<    | Start of Message | previous%,>>a<< apple%banana%,>>b<< foo%bar%baz%      |
+      | previous%>>a<< apple%banana%>>b<< foo%bar%baz%>>c<< cat%dog% | >>[a-z]+<<    | End of Message   | previous%>>a<<, apple%banana%>>b<<, foo%bar%baz%>>c<< |
+      | long%message%with%a%single%delimiter%at%the%end%!%           | !             | Start of Message | long%message%with%a%single%delimiter%at%the%end%      |
+      | long%message%with%a%single%delimiter%at%the%end%!%           | !             | End of Message   | long%message%with%a%single%delimiter%at%the%end%!     |
diff --git a/docker/test/integration/minifi/processors/DefragmentText.py b/docker/test/integration/minifi/processors/DefragmentText.py
new file mode 100644
index 0000000..29ea2e5
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DefragmentText.py
@@ -0,0 +1,9 @@
+from ..core.Processor import Processor
+
+
+class DefragmentText(Processor):
+    def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(DefragmentText, self).__init__('DefragmentText',
+                                             schedule=schedule,
+                                             properties={'Delimiter': delimiter},
+                                             auto_terminate=['success', 'failure'])
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index 160b474..a468159 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -350,23 +350,6 @@ void BinFiles::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
   file_store_.put(flowFile);
 }
 
-void BinFiles::FlowFileStore::put(const std::shared_ptr<core::FlowFile>& flowFile) {
-  {
-    std::lock_guard<std::mutex> guard(flow_file_mutex_);
-    incoming_files_.emplace(std::move(flowFile));
-  }
-  has_new_flow_file_.store(true, std::memory_order_release);
-}
-
-std::unordered_set<std::shared_ptr<core::FlowFile>> BinFiles::FlowFileStore::getNewFlowFiles() {
-  bool hasNewFlowFiles = true;
-  if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
-    return {};
-  }
-  std::lock_guard<std::mutex> guard(flow_file_mutex_);
-  return std::move(incoming_files_);
-}
-
 std::set<std::shared_ptr<core::Connectable>> BinFiles::getOutGoingConnections(const std::string &relationship) const {
   auto result = core::Connectable::getOutGoingConnections(relationship);
   if (relationship == Self.getName()) {
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index b2443ee..aa8df05 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -34,6 +34,7 @@
 #include "utils/gsl.h"
 #include "utils/Id.h"
 #include "utils/Export.h"
+#include "core/FlowFileStore.h"
 
 namespace org {
 namespace apache {
@@ -265,24 +266,10 @@ class BinFiles : public core::Processor {
   BinManager binManager_;
 
  private:
-  class FlowFileStore{
-   public:
-    /**
-     * Returns the already-preprocessed FlowFiles that got restored on restart from the FlowFileRepository
-     * @return the resurrected persisted FlowFiles
-     */
-    std::unordered_set<std::shared_ptr<core::FlowFile>> getNewFlowFiles();
-    void put(const std::shared_ptr<core::FlowFile>& flowFile);
-   private:
-    std::atomic_bool has_new_flow_file_{false};
-    std::mutex flow_file_mutex_;
-    std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
-  };
-
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<BinFiles>::getLogger()};
   uint32_t batchSize_{1};
   uint32_t maxBinCount_{100};
-  FlowFileStore file_store_;
+  core::FlowFileStore file_store_;
 };
 
 } /* namespace processors */
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp
new file mode 100644
index 0000000..04c6f0e
--- /dev/null
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefragmentText.h"
+
+#include <vector>
+#include <utility>
+
+#include "core/Resource.h"
+#include "serialization/PayloadSerializer.h"
+#include "TextFragmentUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship DefragmentText::Success("success", "Flowfiles that have been successfully defragmented");
+const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process");
+const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor");
+
+const core::Property DefragmentText::Pattern(
+    core::PropertyBuilder::createProperty("Pattern")
+        ->withDescription("A regular expression to match at the start or end of messages.")
+        ->isRequired(true)->build());
+
+const core::Property DefragmentText::PatternLoc(
+    core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.")
+        ->withAllowableValues(PatternLocation::values())
+        ->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
+
+
+const core::Property DefragmentText::MaxBufferSize(
+    core::PropertyBuilder::createProperty("Max Buffer Size")
+        ->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is <size> <data unit>")
+        ->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
+
+const core::Property DefragmentText::MaxBufferAge(
+    core::PropertyBuilder::createProperty("Max Buffer Age")->
+        withDescription("The maximum age of the buffer after which it will be transferred to success when matching Start of Message patterns or to failure when matching End of Message patterns. "
+                        "Expected format is <duration> <time unit>")
+        ->withDefaultValue("10 min")
+        ->build());
+
+void DefragmentText::initialize() {
+  setSupportedRelationships({Success, Failure});
+  setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
+}
+
+void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  std::string max_buffer_age_str;
+  if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
+    core::TimeUnit unit;
+    uint64_t max_buffer_age;
+    if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
+      buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
+      logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age);
+    }
+  }
+
+  std::string max_buffer_size_str;
+  if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
+    uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue();
+    if (max_buffer_size > 0) {
+      buffer_.setMaxSize(max_buffer_size);
+      logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size);
+    }
+  }
+
+  context->getProperty(PatternLoc.getName(), pattern_location_);
+
+  std::string pattern_str;
+  if (context->getProperty(Pattern.getName(), pattern_str) && !pattern_str.empty()) {
+    pattern_ = std::regex(pattern_str);
+    logger_->log_trace("The Pattern is configured to be %s", pattern_str);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid");
+  }
+}
+
+void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* session) {
+  gsl_Expects(session);
+  auto flowFiles = flow_file_store_.getNewFlowFiles();
+  for (auto& file : flowFiles) {
+    if (file)
+      processNextFragment(session, gsl::not_null(std::move(file)));
+  }
+  {
+    std::shared_ptr<core::FlowFile> original_flow_file = session->get();
+    if (original_flow_file)
+      processNextFragment(session, gsl::not_null(std::move(original_flow_file)));
+  }
+  if (buffer_.maxSizeReached()) {
+    buffer_.flushAndReplace(session, Failure, nullptr);
+    return;
+  }
+  if (buffer_.maxAgeReached()) {
+    if (pattern_location_ == PatternLocation::START_OF_MESSAGE)
+      buffer_.flushAndReplace(session, Success, nullptr);
+    else
+      buffer_.flushAndReplace(session, Failure, nullptr);
+  }
+}
+
+void DefragmentText::processNextFragment(core::ProcessSession *session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment) {
+  if (!buffer_.isCompatible(*next_fragment)) {
+    buffer_.flushAndReplace(session, Failure, nullptr);
+    session->transfer(next_fragment, Failure);
+    return;
+  }
+  std::shared_ptr<core::FlowFile> split_before_last_pattern;
+  std::shared_ptr<core::FlowFile> split_after_last_pattern;
+  bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, split_before_last_pattern,
+                                                  split_after_last_pattern);
+  if (split_before_last_pattern)
+    buffer_.append(session, gsl::not_null(std::move(split_before_last_pattern)));
+  if (found_pattern) {
+    buffer_.flushAndReplace(session, Success, split_after_last_pattern);
+  }
+  session->remove(next_fragment);
+}
+
+
+void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& original_flow_file,
+                                                   const std::shared_ptr<core::FlowFile>& split_before_last_pattern,
+                                                   const std::shared_ptr<core::FlowFile>& split_after_last_pattern,
+                                                   const size_t split_position) const {
+  std::string base_name, post_name, offset_str;
+  if (!original_flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
+    return;
+  if (!original_flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
+    return;
+  if (!original_flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
+    return;
+
+  size_t fragment_offset = std::stoi(offset_str);
+
+  if (split_before_last_pattern) {
+    std::string first_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, split_before_last_pattern->getSize());
+    split_before_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, first_part_name);
+  }
+  if (split_after_last_pattern) {
+    std::string second_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset + split_position, split_after_last_pattern->getSize());
+    split_after_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, second_part_name);
+    split_after_last_pattern->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(fragment_offset + split_position));
+  }
+}
+
+namespace {
+class AppendFlowFileToFlowFile : public OutputStreamCallback {
+ public:
+  explicit AppendFlowFileToFlowFile(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append, PayloadSerializer& serializer)
+      : flow_file_to_append_(flow_file_to_append), serializer_(serializer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+    return serializer_.serialize(flow_file_to_append_, stream);
+  }
+ private:
+  const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append_;
+  PayloadSerializer& serializer_;
+};
+
+void updateAppendedAttributes(core::FlowFile& buffered_ff) {
+  std::string base_name, post_name, offset_str;
+  if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
+    return;
+  if (!buffered_ff.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
+    return;
+  if (!buffered_ff.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
+    return;
+  size_t fragment_offset = std::stoi(offset_str);
+
+  std::string buffer_new_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, buffered_ff.getSize());
+  buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
+}
+
+struct ReadFlowFileContent : public InputStreamCallback {
+  std::string content;
+
+  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+    content.resize(stream->size());
+    const auto ret = stream->read(reinterpret_cast<uint8_t *>(content.data()), stream->size());
+    if (io::isError(ret))
+      return -1;
+    return gsl::narrow<int64_t>(ret);
+  }
+};
+
+size_t getSplitPosition(const std::smatch& last_match, DefragmentText::PatternLocation pattern_location) {
+  size_t split_position = last_match.position(0);
+  if (pattern_location == DefragmentText::PatternLocation::END_OF_MESSAGE) {
+    split_position += last_match.length(0);
+  }
+  return split_position;
+}
+
+}  // namespace
+
+bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession *session,
+                                                const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
+                                                std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+                                                std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
+  ReadFlowFileContent read_flow_file_content;
+  session->read(original_flow_file, &read_flow_file_content);
+  auto last_regex_match = utils::StringUtils::getLastRegexMatch(read_flow_file_content.content, pattern_);
+  if (!last_regex_match.ready()) {
+    split_before_last_pattern = session->clone(original_flow_file);
+    split_after_last_pattern = nullptr;
+    return false;
+  }
+  auto split_position = getSplitPosition(last_regex_match, pattern_location_);
+  if (split_position != 0) {
+    split_before_last_pattern = session->clone(original_flow_file, 0, split_position);
+  }
+  if (split_position != original_flow_file->getSize()) {
+    split_after_last_pattern = session->clone(original_flow_file, split_position, original_flow_file->getSize() - split_position);
+  }
+  updateAttributesForSplitFiles(*original_flow_file, split_before_last_pattern, split_after_last_pattern, split_position);
+  return true;
+}
+
+void DefragmentText::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
+  if (!flowFile)
+    return;
+  flow_file_store_.put(flowFile);
+}
+
+std::set<std::shared_ptr<core::Connectable>> DefragmentText::getOutGoingConnections(const std::string &relationship) const {
+  auto result = core::Connectable::getOutGoingConnections(relationship);
+  if (relationship == Self.getName()) {
+    result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
+  }
+  return result;
+}
+
+void DefragmentText::Buffer::append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append) {
+  if (empty()) {
+    store(session, flow_file_to_append);
+    return;
+  }
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+    return session->read(ff, cb);
+  };
+  PayloadSerializer serializer(flowFileReader);
+  AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append, serializer);
+  session->add(buffered_flow_file_);
+  session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
+  updateAppendedAttributes(*buffered_flow_file_);
+  session->transfer(buffered_flow_file_, Self);
+
+  session->remove(flow_file_to_append);
+}
+
+bool DefragmentText::Buffer::maxSizeReached() const {
+  return !empty()
+      && max_size_.has_value()
+      && (max_size_.value() < buffered_flow_file_->getSize());
+}
+
+bool DefragmentText::Buffer::maxAgeReached() const {
+  return !empty()
+      && max_age_.has_value()
+      && (creation_time_ + max_age_.value() < std::chrono::steady_clock::now());
+}
+
+void DefragmentText::Buffer::setMaxAge(std::chrono::milliseconds max_age) {
+  max_age_ = max_age;
+}
+
+void DefragmentText::Buffer::setMaxSize(size_t max_size) {
+  max_size_ = max_size;
+}
+
+void DefragmentText::Buffer::flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
+                                             const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
+  if (!empty()) {
+    session->add(buffered_flow_file_);
+    session->transfer(buffered_flow_file_, relationship);
+  }
+  store(session, new_buffered_flow_file);
+}
+
+void DefragmentText::Buffer::store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
+  buffered_flow_file_ = new_buffered_flow_file;
+  creation_time_ = std::chrono::steady_clock::now();
+  if (!empty()) {
+    session->add(buffered_flow_file_);
+    session->transfer(buffered_flow_file_, Self);
+  }
+}
+
+bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+  if (empty())
+    return true;
+  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
+      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
+    return false;
+  }
+  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
+      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
+    return false;
+  }
+  std::string current_offset_str, append_offset_str;
+  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
+      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
+    return false;
+  }
+  if (!current_offset_str.empty() && !append_offset_str.empty()) {
+    size_t current_offset = std::stoi(current_offset_str);
+    size_t append_offset = std::stoi(append_offset_str);
+    if (current_offset + buffered_flow_file_->getSize() != append_offset)
+      return false;
+  }
+  return true;
+}
+
+REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them");
+
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/DefragmentText.h b/extensions/standard-processors/processors/DefragmentText.h
new file mode 100644
index 0000000..1f52379
--- /dev/null
+++ b/extensions/standard-processors/processors/DefragmentText.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <regex>
+#include <memory>
+#include <string>
+#include <set>
+
+#include "core/Processor.h"
+#include "core/FlowFileStore.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
+#include "serialization/PayloadSerializer.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class DefragmentText : public core::Processor {
+ public:
+  explicit DefragmentText(const std::string& name,  const utils::Identifier& uuid = {})
+      : Processor(name, uuid) {
+  }
+  EXTENSIONAPI static const core::Relationship Self;
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  EXTENSIONAPI static const core::Property Pattern;
+  EXTENSIONAPI static const core::Property PatternLoc;
+  EXTENSIONAPI static const core::Property MaxBufferAge;
+  EXTENSIONAPI static const core::Property MaxBufferSize;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* sessionFactory) override;
+  void onTrigger(core::ProcessContext* context, core::ProcessSession* session) override;
+  void restore(const std::shared_ptr<core::FlowFile>& flowFile) override;
+  std::set<std::shared_ptr<core::Connectable>> getOutGoingConnections(const std::string &relationship) const override;
+
+  SMART_ENUM(PatternLocation,
+             (END_OF_MESSAGE, "End of Message"),
+             (START_OF_MESSAGE, "Start of Message")
+  )
+
+ private:
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+ protected:
+  class Buffer {
+   public:
+    bool isCompatible(const core::FlowFile& fragment) const;
+    void append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append);
+    bool maxSizeReached() const;
+    bool maxAgeReached() const;
+    void setMaxAge(std::chrono::milliseconds max_age);
+    void setMaxSize(size_t max_size);
+    void flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
+                         const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
+
+    bool empty() const { return buffered_flow_file_ == nullptr; }
+
+   private:
+    void store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
+
+    std::shared_ptr<core::FlowFile> buffered_flow_file_;
+    std::chrono::time_point<std::chrono::steady_clock> creation_time_;
+    std::optional<std::chrono::milliseconds> max_age_;
+    std::optional<size_t> max_size_;
+  };
+
+  std::regex pattern_;
+  PatternLocation pattern_location_;
+
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DefragmentText>::getLogger();
+  core::FlowFileStore flow_file_store_;
+  Buffer buffer_;
+
+  void processNextFragment(core::ProcessSession *session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment);
+
+  bool splitFlowFileAtLastPattern(core::ProcessSession *session,
+                                  const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
+                                  std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+                                  std::shared_ptr<core::FlowFile> &split_after_last_pattern) const;
+
+  void updateAttributesForSplitFiles(const core::FlowFile &original_flow_file,
+                                     const std::shared_ptr<core::FlowFile> &split_before_last_pattern,
+                                     const std::shared_ptr<core::FlowFile> &split_after_last_pattern,
+                                     const size_t split_position) const;
+};
+
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index d971f73..d363681 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -40,11 +40,13 @@
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "TextFragmentUtils.h"
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -795,11 +797,13 @@ void TailFile::updateFlowFileAttributes(const std::string &full_file_name, const
                                         const std::string &extension,
                                         std::shared_ptr<core::FlowFile> &flow_file) const {
   logger_->log_info("TailFile %s for %" PRIu64 " bytes", fileName, flow_file->getSize());
-  std::string logName = baseName + "." + std::to_string(state.position_) + "-" +
-                        std::to_string(state.position_ + flow_file->getSize() - 1) + "." + extension;
+  std::string logName = textfragmentutils::createFileName(baseName, extension, state.position_, flow_file->getSize());
   flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_);
   flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name);
   flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
+  flow_file->setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
+  flow_file->setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
+  flow_file->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));
 }
 
 void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const {
diff --git a/extensions/standard-processors/processors/TextFragmentUtils.h b/extensions/standard-processors/processors/TextFragmentUtils.h
new file mode 100644
index 0000000..ed024b2
--- /dev/null
+++ b/extensions/standard-processors/processors/TextFragmentUtils.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors::textfragmentutils {
+  constexpr const char* BASE_NAME_ATTRIBUTE = "TextFragmentAttribute.base_name";
+  constexpr const char* POST_NAME_ATTRIBUTE = "TextFragmentAttribute.post_name";
+  constexpr const char* OFFSET_ATTRIBUTE = "TextFragmentAttribute.offset";
+
+  inline std::string createFileName(const std::string& base_name, const std::string& post_name, const size_t offset, const size_t size) {
+    return base_name + "." + std::to_string(offset) + "-" + std::to_string(offset + size - 1) + "." + post_name;
+  }
+}  // namespace org::apache::nifi::minifi::processors::textfragmentutils
diff --git a/extensions/standard-processors/processors/UpdateAttribute.cpp b/extensions/standard-processors/processors/UpdateAttribute.cpp
index fa05f44..35ee4df 100644
--- a/extensions/standard-processors/processors/UpdateAttribute.cpp
+++ b/extensions/standard-processors/processors/UpdateAttribute.cpp
@@ -32,8 +32,8 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Relationship UpdateAttribute::Success("success", "All files are routed to success");
-core::Relationship UpdateAttribute::Failure("failure", "Failed files are transferred to failure");
+const core::Relationship UpdateAttribute::Success("success", "All files are routed to success");
+const core::Relationship UpdateAttribute::Failure("failure", "Failed files are transferred to failure");
 
 void UpdateAttribute::initialize() {
   std::set<core::Property> properties;
diff --git a/extensions/standard-processors/processors/UpdateAttribute.h b/extensions/standard-processors/processors/UpdateAttribute.h
index 3a2c047..e8b0be9 100644
--- a/extensions/standard-processors/processors/UpdateAttribute.h
+++ b/extensions/standard-processors/processors/UpdateAttribute.h
@@ -46,8 +46,8 @@ class UpdateAttribute : public core::Processor {
    * Relationships
    */
 
-  static core::Relationship Success;
-  static core::Relationship Failure;
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
 
   /**
    * NiFi API implementation
diff --git a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
new file mode 100644
index 0000000..9cd32de
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "WriteToFlowFileTestProcessor.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "UpdateAttribute.h"
+#include "DefragmentText.h"
+#include "TextFragmentUtils.h"
+#include "utils/TestUtils.h"
+#include "serialization/PayloadSerializer.h"
+#include "serialization/FlowFileSerializer.h"
+#include "unit/ContentRepositoryDependentTests.h"
+
+using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
+using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
+using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+using DefragmentText = org::apache::nifi::minifi::processors::DefragmentText;
+
+TEST_CASE("DefragmentText Single source tests", "[defragmenttextsinglesource]") {
+  TestController testController;
+  auto plan = testController.createPlan();
+  auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+  auto defrag_text_flow_files = std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", "defrag_text_flow_files"));
+  auto read_from_success_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_success_relationship"));
+  auto read_from_failure_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure_relationship"));
+
+  plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, defrag_text_flow_files);
+
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Success, read_from_success_relationship);
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, read_from_failure_relationship);
+
+  read_from_success_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+
+  SECTION("Throws on empty pattern") {
+    REQUIRE_THROWS(testController.runSession(plan));
+  }
+
+  SECTION("Throws on invalid pattern") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "\"[a-b][a\"");
+
+    REQUIRE_THROWS(testController.runSession(plan));
+  }
+
+  SECTION("Single line messages starting with pattern") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+
+    write_to_flow_file->setContent("<1> Foo");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    write_to_flow_file->setContent("<2> Bar");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("<1> Foo"));
+    write_to_flow_file->setContent("<3> Baz");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("<2> Bar"));
+  }
+
+  SECTION("Single line messages ending with pattern") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+
+    write_to_flow_file->setContent("Foo <1>");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("Foo <1>"));
+    write_to_flow_file->setContent("Bar <2>");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("Bar <2>"));
+    write_to_flow_file->setContent("Baz <3>");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("Baz <3>"));
+  }
+
+  SECTION("Multiline matching start of messages") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+
+    write_to_flow_file->setContent("apple<1> banana<2> cherry<3> dragon ");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("apple<1> banana<2> cherry"));
+
+    write_to_flow_file->setContent("fruit<4> elderberry<5> fig<6> grapefruit");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("<3> dragon fruit<4> elderberry<5> fig"));
+  }
+
+  SECTION("Multiline matching end of messages") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+
+    write_to_flow_file->setContent("apple<1> banana<2> cherry<3> dragon ");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("apple<1> banana<2> cherry<3>"));
+
+    write_to_flow_file->setContent("fruit<4> elderberry<5> fig<6> grapefruit");
+    plan->reset();
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent(" dragon fruit<4> elderberry<5> fig<6>"));
+  }
+
+  SECTION("Timeout test Start of Line") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent("");
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("Message"));
+  }
+
+  SECTION("Timeout test Start of Line") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::START_OF_MESSAGE));
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent("");
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->readFlowFileWithContent("Message"));
+  }
+
+  SECTION("Timeout test Start of Line") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "100 ms");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent("");
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    testController.runSession(plan);
+    CHECK(read_from_failure_relationship->readFlowFileWithContent("Message"));
+  }
+
+  SECTION("Timeout test without enough time") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferAge.getName(), "1 h");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent("");
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+  }
+
+  SECTION("Max Buffer test") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferSize.getName(), "100 B");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent(std::string(150, '*'));
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure_relationship->readFlowFileWithContent(std::string("Message").append(std::string(150, '*'))));
+  }
+
+  SECTION("Max Buffer test without overflow") {
+    plan->setProperty(defrag_text_flow_files, DefragmentText::MaxBufferSize.getName(), "100 MB");
+    plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+
+    write_to_flow_file->setContent("Message");
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+
+    plan->reset();
+    write_to_flow_file->setContent(std::string(150, '*'));
+    testController.runSession(plan);
+    CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+  }
+}
+
+TEST_CASE("DefragmentTextInvalidSources", "[defragmenttextinvalidsources]") {
+  TestController testController;
+  auto plan = testController.createPlan();
+  auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+  auto update_ff = std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute", "update_attribute"));
+  auto defrag_text_flow_files =  std::dynamic_pointer_cast<DefragmentText>(plan->addProcessor("DefragmentText", "defrag_text_flow_files"));
+  auto read_from_failure_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure_relationship"));
+
+  plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, update_ff);
+  plan->addConnection(update_ff, UpdateAttribute ::Success, defrag_text_flow_files);
+
+  plan->addConnection(defrag_text_flow_files, DefragmentText::Failure, read_from_failure_relationship);
+  defrag_text_flow_files->setAutoTerminatedRelationships({DefragmentText::Success});
+
+  read_from_failure_relationship->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>");
+  plan->setProperty(update_ff, org::apache::nifi::minifi::processors::textfragmentutils::BASE_NAME_ATTRIBUTE, "${UUID()}", true);
+
+  write_to_flow_file->setContent("Foo <1> Foo");
+  testController.runSession(plan);
+  CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+  write_to_flow_file->setContent("Bar <2> Bar");
+  plan->reset();
+  testController.runSession(plan);
+  CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 2);
+  CHECK(read_from_failure_relationship->readFlowFileWithContent("<1> Foo"));
+  CHECK(read_from_failure_relationship->readFlowFileWithContent("Bar <2> Bar"));
+}
diff --git a/libminifi/include/core/FlowFileStore.h b/libminifi/include/core/FlowFileStore.h
new file mode 100644
index 0000000..6b261dd
--- /dev/null
+++ b/libminifi/include/core/FlowFileStore.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <unordered_set>
+#include <utility>
+
+#include "FlowFile.h"
+namespace org::apache::nifi::minifi::core {
+
+class FlowFileStore {
+ public:
+  std::unordered_set<std::shared_ptr<FlowFile>> getNewFlowFiles() {
+    bool hasNewFlowFiles = true;
+    if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
+      return {};
+    }
+    std::lock_guard<std::mutex> guard(flow_file_mutex_);
+    return std::move(incoming_files_);
+  }
+
+  void put(const std::shared_ptr<FlowFile>& flowFile)  {
+    {
+      std::lock_guard<std::mutex> guard(flow_file_mutex_);
+      incoming_files_.emplace(flowFile);
+    }
+    has_new_flow_file_.store(true, std::memory_order_release);
+  }
+ private:
+  std::atomic_bool has_new_flow_file_{false};
+  std::mutex flow_file_mutex_;
+  std::unordered_set<std::shared_ptr<FlowFile>> incoming_files_;
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index b6148c6..ff8c04c 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -23,6 +23,7 @@
 #include <iostream>
 #include <map>
 #include <optional>
+#include <regex>
 #include <sstream>
 #include <string>
 #include <string_view>
@@ -504,6 +505,14 @@ class StringUtils {
     return str;
   }
 
+  /**
+   * Returns the last match of a regular expression within the given string
+   * @param str incoming string
+   * @param pattern the regex to be matched
+   * @return the last valid std::smatch or a default constructed smatch (ready() != true) if no matches have been found
+   */
+  static std::smatch getLastRegexMatch(const std::string& str, const std::regex& pattern);
+
  private:
   inline static char nibble_to_hex(uint8_t nibble, bool uppercase) {
     if (nibble < 10) {
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 82d748d..dcb95f4 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -748,13 +748,13 @@ void ProcessSession::commit() {
     }
 
     for (const auto& record : _deletedFlowFiles) {
-        if (!record->isDeleted()) {
-          continue;
-        }
-        if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
-          // mark for deletion in the flowFileRepository
-          record->setStoredToRepository(false);
-        }
+      if (!record->isDeleted()) {
+        continue;
+      }
+      if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+        // mark for deletion in the flowFileRepository
+        record->setStoredToRepository(false);
+      }
     }
 
     ensureNonNullResourceClaim(connectionQueues);
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index cc2952e..1732886 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -390,6 +390,16 @@ std::string StringUtils::to_base64(const uint8_t* data, size_t length, bool url
   return std::string(buf.data(), base64_length);
 }
 
+std::smatch StringUtils::getLastRegexMatch(const std::string& str, const std::regex& pattern) {
+  auto matches = std::sregex_iterator(str.begin(), str.end(), pattern);
+  std::smatch last_match;
+  while (matches != std::sregex_iterator()) {
+    last_match = *matches;
+    matches = std::next(matches);
+  }
+  return last_match;
+}
+
 constexpr uint8_t StringUtils::SKIP;
 constexpr uint8_t StringUtils::hex_lut[128];
 constexpr const char StringUtils::base64_enc_lut[];
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
new file mode 100644
index 0000000..7f41fc1
--- /dev/null
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ReadFromFlowFileTestProcessor.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship ReadFromFlowFileTestProcessor::Success("success", "success operational on the flow record");
+
+void ReadFromFlowFileTestProcessor::initialize() {
+  setSupportedRelationships({ Success });
+}
+
+void ReadFromFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) {
+  logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
+}
+
+namespace {
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+}
+
+void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  gsl_Expects(context && session);
+  logger_->log_info("%s", ON_TRIGGER_LOG_STR);
+  flow_file_contents_.clear();
+
+  while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
+    ReadFlowFileIntoBuffer callback;
+    session->read(flow_file, &callback);
+    flow_file_contents_.push_back(std::string(callback.buffer_.begin(), callback.buffer_.end()));
+    session->transfer(flow_file, Success);
+  }
+}
+
+void ReadFromFlowFileTestProcessor::onUnSchedule() {
+  logger_->log_info("%s", ON_UNSCHEDULE_LOG_STR);
+}
+
+REGISTER_RESOURCE(ReadFromFlowFileTestProcessor, "ReadFromFlowFileTestProcessor (only for testing purposes)");
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.h b/libminifi/test/ReadFromFlowFileTestProcessor.h
new file mode 100644
index 0000000..455c7e7
--- /dev/null
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <memory>
+#include <vector>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#pragma once
+
+namespace org::apache::nifi::minifi::processors {
+
+class ReadFromFlowFileTestProcessor : public core::Processor {
+ public:
+  static constexpr const char* ON_SCHEDULE_LOG_STR = "ReadFromFlowFileTestProcessor::onSchedule executed";
+  static constexpr const char* ON_TRIGGER_LOG_STR = "ReadFromFlowFileTestProcessor::onTrigger executed";
+  static constexpr const char* ON_UNSCHEDULE_LOG_STR = "ReadFromFlowFileTestProcessor::onUnSchedule executed";
+
+  explicit ReadFromFlowFileTestProcessor(const std::string& name, const utils::Identifier& uuid = utils::Identifier())
+      : Processor(name, uuid) {
+  }
+
+  static constexpr char const* const ProcessorName = "ReadFromFlowFileTestProcessor";
+  static const core::Relationship Success;
+
+ public:
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+  void initialize() override;
+  void onUnSchedule() override;
+
+  bool readFlowFileWithContent(const std::string& content) const {
+    return std::find(flow_file_contents_.begin(), flow_file_contents_.end(), content) != flow_file_contents_.end();
+  }
+
+  size_t numberOfFlowFilesRead() const {
+    return flow_file_contents_.size();
+  }
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadFromFlowFileTestProcessor>::getLogger();
+  std::vector<std::string> flow_file_contents_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.cpp b/libminifi/test/WriteToFlowFileTestProcessor.cpp
new file mode 100644
index 0000000..90a5090
--- /dev/null
+++ b/libminifi/test/WriteToFlowFileTestProcessor.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteToFlowFileTestProcessor.h"
+
+#include <string>
+#include <vector>
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship WriteToFlowFileTestProcessor::Success("success", "success operational on the flow record");
+
+void WriteToFlowFileTestProcessor::initialize() {
+  setSupportedProperties({});
+  setSupportedRelationships({Success});
+}
+
+void WriteToFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::ProcessSessionFactory*) {
+  logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
+}
+
+namespace {
+struct WriteToFlowFileCallback : public OutputStreamCallback {
+  const gsl::span<const uint8_t> content_;
+
+  explicit WriteToFlowFileCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
+    size_t bytes_written = stream->write(content_.begin(), content_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+}  // namespace
+
+void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+  gsl_Expects(context && session);
+  logger_->log_info("%s", ON_TRIGGER_LOG_STR);
+  if (content_.empty()) {
+    context->yield();
+    return;
+  }
+  std::shared_ptr<core::FlowFile> flow_file = session->create();
+  if (!flow_file) {
+    logger_->log_error("Failed to create flowfile!");
+    return;
+  }
+  WriteToFlowFileCallback callback(content_);
+  session->write(flow_file, &callback);
+  session->transfer(flow_file, Success);
+}
+
+void WriteToFlowFileTestProcessor::onUnSchedule() {
+  logger_->log_info("%s", ON_UNSCHEDULE_LOG_STR);
+}
+
+REGISTER_RESOURCE(WriteToFlowFileTestProcessor, "WriteToFlowFileTestProcessor (only for testing purposes)");
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.h b/libminifi/test/WriteToFlowFileTestProcessor.h
new file mode 100644
index 0000000..6d05221
--- /dev/null
+++ b/libminifi/test/WriteToFlowFileTestProcessor.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#pragma once
+
+namespace org::apache::nifi::minifi::processors {
+
+class WriteToFlowFileTestProcessor : public core::Processor {
+ public:
+  static constexpr const char* ON_SCHEDULE_LOG_STR = "WriteToFlowFileTestProcessor::onSchedule executed";
+  static constexpr const char* ON_TRIGGER_LOG_STR = "WriteToFlowFileTestProcessor::onTrigger executed";
+  static constexpr const char* ON_UNSCHEDULE_LOG_STR = "WriteToFlowFileTestProcessor::onUnSchedule executed";
+
+  explicit WriteToFlowFileTestProcessor(const std::string& name, const utils::Identifier& uuid = utils::Identifier())
+      : Processor(name, uuid) {
+  }
+
+  static constexpr char const* const ProcessorName = "WriteToFlowFileTestProcessor";
+  static const core::Relationship Success;
+
+ public:
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+  void initialize() override;
+  void onUnSchedule() override;
+
+  void setContent(std::string content) {
+    content_ = std::move(content);
+  }
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteToFlowFileTestProcessor>::getLogger();
+  std::string content_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index c456380..01b5251 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -229,6 +229,11 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
 TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") {
   ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>());
-  ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::DatabaseContentRepository>());
 }
 
+
+TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
+  ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
+
+  ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
+}
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 90def02..581b68e 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -96,19 +96,20 @@ class Fixture {
 
   core::ProcessSession &processSession() { return *process_session_; }
 
-  void commitFlowFile(const std::string& content) {
-    const auto original_ff = process_session_->create();
-    WriteStringToFlowFile callback(content);
-    process_session_->write(original_ff, &callback);
-    process_session_->transfer(original_ff, Success);
+  void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
+    process_session_->transfer(flow_file, Success);
     process_session_->commit();
   }
 
-  void appendAndCommit(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
+  void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) {
+    WriteStringToFlowFile callback(content);
+    process_session_->write(flow_file, &callback);
+  }
+
+  void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
     WriteStringToFlowFile callback(content_to_append);
+    process_session_->add(flow_file);
     process_session_->append(flow_file, &callback);
-    process_session_->transfer(flow_file, Success);
-    process_session_->commit();
   }
 
  private:
@@ -123,8 +124,9 @@ class Fixture {
 void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> content_repo) {
   Fixture fixture = Fixture(content_repo);
   core::ProcessSession& process_session = fixture.processSession();
-  fixture.commitFlowFile("foobar");
-  const auto original_ff = process_session.get();
+  const auto original_ff = process_session.create();
+  fixture.writeToFlowFile(original_ff, "foobar");
+  fixture.transferAndCommit(original_ff);
   REQUIRE(original_ff);
   auto clone_first_half = process_session.clone(original_ff, 0, 3);
   auto clone_second_half = process_session.clone(original_ff, 3, 3);
@@ -149,13 +151,36 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   CHECK(read_until_it_can_callback.value_ == "bar");
 }
 
-void testAppendSize(std::shared_ptr<core::ContentRepository> content_repo) {
+void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
   Fixture fixture = Fixture(content_repo);
   core::ProcessSession& process_session = fixture.processSession();
-  fixture.commitFlowFile("my");
-  const auto flow_file = process_session.get();
-  fixture.appendAndCommit(flow_file, "foobar");
+  const auto flow_file = process_session.create();
   REQUIRE(flow_file);
+
+  fixture.writeToFlowFile(flow_file, "my");
+  fixture.transferAndCommit(flow_file);
+  fixture.appendToFlowFile(flow_file, "foobar");
+  fixture.transferAndCommit(flow_file);
+
+  CHECK(flow_file->getSize() == 8);
+  ReadUntilStreamSize read_until_stream_size_callback;
+  ReadUntilItCan read_until_it_can_callback;
+  process_session.read(flow_file, &read_until_stream_size_callback);
+  process_session.read(flow_file, &read_until_it_can_callback);
+  CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+  CHECK(read_until_it_can_callback.value_ == "myfoobar");
+}
+
+void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> content_repo) {
+  Fixture fixture = Fixture(content_repo);
+  core::ProcessSession& process_session = fixture.processSession();
+  const auto flow_file = process_session.create();
+  REQUIRE(flow_file);
+
+  fixture.writeToFlowFile(flow_file, "my");
+  fixture.appendToFlowFile(flow_file, "foobar");
+  fixture.transferAndCommit(flow_file);
+
   CHECK(flow_file->getSize() == 8);
   ReadUntilStreamSize read_until_stream_size_callback;
   ReadUntilItCan read_until_it_can_callback;
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 97775d3..19d21fd 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -100,6 +100,12 @@ TEST_CASE("ProcessSession::rollback penalizes affected flowfiles", "[rollback]")
 TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[readoffsetsize]") {
   ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<minifi::core::repository::VolatileContentRepository>());
   ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<minifi::core::repository::FileSystemRepository>());
-  ContentRepositoryDependentTests::testAppendSize(std::make_shared<minifi::core::repository::VolatileContentRepository>());
-  ContentRepositoryDependentTests::testAppendSize(std::make_shared<minifi::core::repository::FileSystemRepository>());
+}
+
+TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
+  ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+  ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+
+  ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+  ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
 }
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 307050e..85f6292 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -471,3 +471,26 @@ TEST_CASE("StringUtils::removeFramingCharacters works correctly", "[removeFramin
   REQUIRE(utils::StringUtils::removeFramingCharacters("\"abba\"", '"') == "abba");
   REQUIRE(utils::StringUtils::removeFramingCharacters("\"\"abba\"\"", '"') == "\"abba\"");
 }
+
+TEST_CASE("StringUtils::getLastRegexMatch works correctly", "[getLastRegexMatch]") {
+  std::regex pattern("<[0-9]+>");
+  {
+    std::string content = "Foo";
+    auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+    REQUIRE_FALSE(last_match.ready());
+  }
+  {
+    std::string content = "<1> Foo";
+    auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+    REQUIRE(last_match.ready());
+    CHECK(last_match.length(0) == 3);
+    CHECK(last_match.position(0) == 0);
+  }
+  {
+    std::string content = "<1> Foo<2> Bar<3> Baz<10> Qux";
+    auto last_match = StringUtils::getLastRegexMatch(content, pattern);
+    REQUIRE(last_match.ready());
+    CHECK(last_match.length(0) == 4);
+    CHECK(last_match.position(0) == 21);
+  }
+}