You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/11/03 14:32:32 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1618 Create the ReplaceText processor

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 190667151763b5d743953584cfcacff874c603d9
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Mon Sep 20 10:35:10 2021 +0200

    MINIFICPP-1618 Create the ReplaceText processor
    
    - add chomp() function to StringUtils
    - implement LineByLineInputOutputStreamCallback for reading a flow file line by line
    
    Closes #1170
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  24 ++
 README.md                                          |   2 +-
 .../test/integration/features/replace_text.feature |  49 +++
 .../integration/minifi/processors/ReplaceText.py   |  10 +
 .../standard-processors/processors/ReplaceText.cpp | 339 +++++++++++++++++
 .../standard-processors/processors/ReplaceText.h   |  99 +++++
 .../tests/unit/ReplaceTextTests.cpp                | 408 +++++++++++++++++++++
 libminifi/include/core/ProcessSession.h            |   2 +
 libminifi/include/io/StreamPipe.h                  |   5 +
 .../utils/LineByLineInputOutputStreamCallback.h    |  49 +++
 libminifi/include/utils/StringUtils.h              |   7 +-
 libminifi/src/core/ProcessSession.cpp              |  47 +++
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  73 ++++
 libminifi/src/utils/StringUtils.cpp                |  10 +
 .../LineByLineInputOutputStreamCallbackTests.cpp   | 105 ++++++
 libminifi/test/unit/StringUtilsTests.cpp           |   8 +
 16 files changed, 1235 insertions(+), 2 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index f7ea514..8c85ec9 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -50,6 +50,7 @@
 - [PutSFTP](#putsftp)
 - [PutSQL](#putsql)
 - [QueryDatabaseTable](#querydatabasetable)
+- [ReplaceText](#replacetext)
 - [RetryFlowFile](#retryflowfile)
 - [RouteOnAttribute](#routeonattribute)
 - [RouteText](#routetext)
@@ -1487,6 +1488,29 @@ In the list below, the names of required properties appear in bold. Any other pr
 |initial.maxvalue.<max_value_column>|Initial maximum value for the specified column|Specifies an initial max value for max value column(s). Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).<br/>**Supports Expression Language: true**|
 
 
+## ReplaceText
+
+### Description
+Updates the content of a FlowFile by replacing parts of it using various replacement strategies.
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Evaluation Mode**|Line-by-Line|Entire text<br>Line-by-Line<br>|Run the 'Replacement Strategy' against each line separately (Line-by-Line) or against the whole input treated as a single string (Entire Text).|
+|Line-by-Line Evaluation Mode|All|All<br>Except-First-Line<br>Except-Last-Line<br>First-Line<br>Last-Line<br>|Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).|
+|**Replacement Strategy**|Regex Replace|Always Replace<br>Append<br>Literal Replace<br>Prepend<br>Regex Replace<br>Substitute Variables<br>|The strategy for how and what to replace within the FlowFile's text content. Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value (if an attribute is not found, the placeholder is kept as it was).|
+|**Replacement Value**|||The value to insert using the 'Replacement Strategy'. Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: $& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. Back-references to non-existent capturing groups will be replaced by empty strings. Supports expression language except in Regex Replace mode.<br/>**Supports Expression Language: true**|
+|Search Value|||The Search Value to search for in the FlowFile content. Only used for 'Literal Replace' and 'Regex Replace' matching strategies. Supports expression language except in Regex Replace mode.<br/>**Supports Expression Language: true**|
+
+### Relationships
+| Name | Description |
+| - | - |
+|failure|FlowFiles that could not be updated are routed to this relationship.|
+|success|FlowFiles that have been successfully processed are routed to this relationship. This includes both FlowFiles that had text replaced and those that did not.|
+
+
 ## RetryFlowFile
 
 ### Description
diff --git a/README.md b/README.md
index 2ba9e72..58322e2 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
 
 | Extension Set        | Processors           |
 | ------------- |:-------------|
-| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
+| **Base**    | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
 
diff --git a/docker/test/integration/features/replace_text.feature b/docker/test/integration/features/replace_text.feature
new file mode 100644
index 0000000..e7f316b
--- /dev/null
+++ b/docker/test/integration/features/replace_text.feature
@@ -0,0 +1,49 @@
+Feature: Changing flowfile contents using the ReplaceText processor
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario Outline: Replace text using Entire text mode
+    Given a GenerateFlowFile processor with the "Custom Text" property set to "<input>"
+    And the scheduling period of the GenerateFlowFile processor is set to "1 hour"
+    And the "Data Format" property of the GenerateFlowFile processor is set to "Text"
+    And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
+    And a ReplaceText processor with the "Evaluation Mode" property set to "Entire text"
+    And the "Replacement Strategy" property of the ReplaceText processor is set to "<replacement_strategy>"
+    And the "Search Value" property of the ReplaceText processor is set to "<search_value>"
+    And the "Replacement Value" property of the ReplaceText processor is set to "<replacement_value>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GenerateFlowFile processor is connected to the ReplaceText
+    And the "success" relationship of the ReplaceText processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "<output>" is placed in the monitored directory in less than 10 seconds
+
+    Examples:
+      | input                 | replacement_strategy | search_value | replacement_value | output                    |
+      | apple                 | Prepend              | _            | pine              | pineapple                 |
+      | apple                 | Append               | _            | sauce             | applesauce                |
+      | one apple, two apples | Regex Replace        | a([a-z]+)e   | ri$1et            | one ripplet, two ripplets |
+      | one apple, two apples | Literal Replace      | apple        | banana            | one banana, two bananas   |
+      | one apple, two apples | Always Replace       | _            | banana            | banana                    |
+
+  Scenario Outline: Replace text using Line-by-Line mode
+    Given a file with the content "<input>" is present in "/tmp/input"
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a ReplaceText processor with the "Evaluation Mode" property set to "Line-by-Line"
+    And the "Line-by-Line Evaluation Mode" property of the ReplaceText processor is set to "<evaluation_mode>"
+    And the "Replacement Strategy" property of the ReplaceText processor is set to "Regex Replace"
+    And the "Search Value" property of the ReplaceText processor is set to "a+(b+)c+"
+    And the "Replacement Value" property of the ReplaceText processor is set to "_$1_"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the ReplaceText
+    And the "success" relationship of the ReplaceText processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "<output>" is placed in the monitored directory in less than 10 seconds
+
+    Examples:
+      | input                      | evaluation_mode      | output                     |
+      | abc\n aabbcc\n aaabbbccc\n | All                  | _b_\n _bb_\n _bbb_\n       |
+      | abc\n aabbcc\n aaabbbccc   | All                  | _b_\n _bb_\n _bbb_         |
+      | abc\n aabbcc\n aaabbbccc\n | First-Line           | _b_\n aabbcc\n aaabbbccc\n |
+      | abc\n aabbcc\n aaabbbccc\n | Last-Line            | abc\n aabbcc\n _bbb_\n     |
+      | abc\n aabbcc\n aaabbbccc\n | Except-First-Line    | abc\n _bb_\n _bbb_\n       |
+      | abc\n aabbcc\n aaabbbccc\n | Except-Last-Line     | _b_\n _bb_\n aaabbbccc\n   |
diff --git a/docker/test/integration/minifi/processors/ReplaceText.py b/docker/test/integration/minifi/processors/ReplaceText.py
new file mode 100644
index 0000000..4612f33
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ReplaceText.py
@@ -0,0 +1,10 @@
+from ..core.Processor import Processor
+
+
+class ReplaceText(Processor):
+    def __init__(self):
+        super(ReplaceText, self).__init__(
+            'ReplaceText',
+            properties={},
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+            auto_terminate=['success', 'failure'])
diff --git a/extensions/standard-processors/processors/ReplaceText.cpp b/extensions/standard-processors/processors/ReplaceText.cpp
new file mode 100644
index 0000000..5bd4ec3
--- /dev/null
+++ b/extensions/standard-processors/processors/ReplaceText.cpp
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(core::logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) {
+  std::vector<char> output;
+  output.reserve(input.size());
+
+  auto it = input.begin();
+  do {
+    auto found = std::search(it, input.end(), parameters.search_value_.begin(), parameters.search_value_.end());
+    if (found != input.end()) {
+      std::copy(it, found, std::back_inserter(output));
+      std::copy(parameters.replacement_value_.begin(), parameters.replacement_value_.end(), std::back_inserter(output));
+      it = found;
+      std::advance(it, parameters.search_value_.size());
+    } else {
+      std::copy(it, input.end(), std::back_inserter(output));
+      it = input.end();
+    }
+  } while (it != input.end());
+
+  return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  static const std::regex PLACEHOLDER{R"(\$\{([^}]+)\})"};
+
+  auto input_it = std::sregex_iterator{input.begin(), input.end(), PLACEHOLDER};
+  const auto input_end = std::sregex_iterator{};
+  if (input_it == input_end) {
+    return input;
+  }
+
+  std::vector<char> output;
+  auto output_it = std::back_inserter(output);
+
+  std::smatch match;
+  for (; input_it != input_end; ++input_it) {
+    match = *input_it;
+    output_it = std::copy(match.prefix().first, match.prefix().second, output_it);
+    std::string attribute_value = getAttributeValue(flow_file, match);
+    output_it = std::copy(attribute_value.begin(), attribute_value.end(), output_it);
+  }
+  std::copy(match.suffix().first, match.suffix().second, output_it);
+
+  return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(match.size() >= 2);
+
+  std::string attribute_key = match[1];
+  std::optional<std::string> attribute_value = flow_file->getAttribute(attribute_key);
+  if (attribute_value) {
+    return *attribute_value;
+  } else {
+    logger_->log_debug("Attribute %s not found in the flow file during %s", attribute_key, toString(ReplacementStrategyType::SUBSTITUTE_VARIABLES));
+    return match[0];
+  }
+}
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies.");
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ReplaceText.h b/extensions/standard-processors/processors/ReplaceText.h
new file mode 100644
index 0000000..316c28f
--- /dev/null
+++ b/extensions/standard-processors/processors/ReplaceText.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property EvaluationMode;
+  EXTENSIONAPI static const core::Property LineByLineEvaluationMode;
+  EXTENSIONAPI static const core::Property ReplacementStrategy;
+  EXTENSIONAPI static const core::Property SearchValue;
+  EXTENSIONAPI static const core::Property ReplacementValue;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {});
+  core::annotation::Input getInputRequirement() const override { return core::annotation::Input::INPUT_REQUIRED; }
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  struct Parameters {
+    std::string search_value_;
+    std::regex search_regex_;
+    std::string replacement_value_;
+  };
+
+  Parameters readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+  void replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const;
+  void replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const;
+
+  std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const;
+  static std::string applyLiteralReplace(const std::string& input, const Parameters& parameters);
+  std::string applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const;
+
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp b/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp
new file mode 100644
index 0000000..4c35384
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GenerateFlowFile.h"
+#include "LogAttribute.h"
+#include "ReplaceText.h"
+#include "TestBase.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+struct ReplaceTextTestAccessor {
+  ReplaceText processor_;
+  ReplaceText::Parameters parameters_;
+
+  ReplaceTextTestAccessor() : processor_{"replace_text"} {}
+
+  void setEvaluationMode(EvaluationModeType evaluation_mode) { processor_.evaluation_mode_ = evaluation_mode; }
+  void setReplacementStrategy(ReplacementStrategyType replacement_strategy) { processor_.replacement_strategy_ = replacement_strategy; }
+  void setSearchValue(const std::string& search_value) { parameters_.search_value_ = search_value; }
+  void setSearchRegex(const std::string& search_regex) { parameters_.search_regex_ = std::regex{search_regex}; }
+  void setReplacementValue(const std::string& replacement_value) { parameters_.replacement_value_ = replacement_value; }
+
+  std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file = {}) const { return processor_.applyReplacements(input, flow_file, parameters_); }
+};
+
+}  // namespace org::apache::nifi::minifi::processors
+
+TEST_CASE("ReplaceText can parse its properties", "[onSchedule]") {
+  TestController testController;
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  LogTestController::getInstance().setDebug<minifi::processors::ReplaceText>();
+
+  std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "One green bottle is hanging on the wall");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+  std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), "Entire text");
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), "Except-First-Line");
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), "Substitute Variables");
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "apple");
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "orange");
+
+  testController.runSession(plan);
+
+  CHECK(LogTestController::getInstance().contains("the Evaluation Mode property is set to Entire text"));
+  CHECK(LogTestController::getInstance().contains("the Line-by-Line Evaluation Mode property is set to Except-First-Line"));
+  CHECK(LogTestController::getInstance().contains("the Replacement Strategy property is set to Substitute Variables"));
+  CHECK(LogTestController::getInstance().contains("the Search Value property is set to apple"));
+  CHECK(LogTestController::getInstance().contains("the Replacement Value property is set to orange"));
+}
+
+TEST_CASE("Prepend works correctly in ReplaceText", "[applyReplacements][Prepend]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+  replace_text.setReplacementValue("orange");
+
+  CHECK(replace_text.applyReplacements("") == "orange");
+  CHECK(replace_text.applyReplacements("s and lemons") == "oranges and lemons");
+}
+
+TEST_CASE("Append works correctly in ReplaceText", "[applyReplacements][Append]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+  replace_text.setReplacementValue("orange");
+
+  CHECK(replace_text.applyReplacements("") == "orange");
+  CHECK(replace_text.applyReplacements("agent ") == "agent orange");
+}
+
+TEST_CASE("Regex Replace works correctly in ReplaceText", "[applyReplacements][Regex Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+  replace_text.setSearchRegex("a\\w+e");
+  replace_text.setReplacementValue("orange");
+
+  CHECK(replace_text.applyReplacements("") == "");
+  CHECK(replace_text.applyReplacements("apple tree") == "orange tree");
+  CHECK(replace_text.applyReplacements("one apple, two apples") == "one orange, two oranges");
+}
+
+TEST_CASE("Regex Replace works with back references in ReplaceText", "[applyReplacements][Regex Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+  replace_text.setSearchRegex("a(b+)c");
+  replace_text.setReplacementValue("$& [found $1]");
+
+  CHECK(replace_text.applyReplacements("") == "");
+  CHECK(replace_text.applyReplacements("abc") == "abc [found b]");
+  CHECK(replace_text.applyReplacements("cba") == "cba");
+  CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx abc [found b] yyy abbbc [found bbb] zzz");
+}
+
+TEST_CASE("Regex Replace treats non-existent back references as blank in ReplaceText", "[applyReplacements][Regex Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+  replace_text.setSearchRegex("a(b+)c");
+  replace_text.setReplacementValue("_$1_ '$2'");
+
+  CHECK(replace_text.applyReplacements("") == "");
+  CHECK(replace_text.applyReplacements("abc") == "_b_ ''");
+  CHECK(replace_text.applyReplacements("cba") == "cba");
+  CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx _b_ '' yyy _bbb_ '' zzz");
+}
+
+TEST_CASE("Back references can be escaped when using Regex Replace in ReplaceText", "[applyReplacements][Regex Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+  replace_text.setSearchRegex("a(b+)c");
+  replace_text.setReplacementValue("$1 costs $$2");
+
+  CHECK(replace_text.applyReplacements("") == "");
+  CHECK(replace_text.applyReplacements("abc") == "b costs $2");
+  CHECK(replace_text.applyReplacements("cba") == "cba");
+  CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx b costs $2 yyy bbb costs $2 zzz");
+}
+
+TEST_CASE("Literal replace works correctly in ReplaceText", "[applyReplacements][Literal Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE);
+  replace_text.setSearchValue("apple");
+  replace_text.setReplacementValue("orange");
+
+  CHECK(replace_text.applyReplacements("") == "");
+  CHECK(replace_text.applyReplacements("apple tree") == "orange tree");
+  CHECK(replace_text.applyReplacements("one apple, two apples") == "one orange, two oranges");
+}
+
+TEST_CASE("Always Replace works correctly in ReplaceText", "[applyReplacements][Always Replace]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE);
+  replace_text.setReplacementValue("orange");
+
+  CHECK(replace_text.applyReplacements("") == "orange");
+  CHECK(replace_text.applyReplacements("apple tree") == "orange");
+  CHECK(replace_text.applyReplacements("one apple, two apples") == "orange");
+}
+
+TEST_CASE("Substitute Variables works correctly in ReplaceText", "[applyReplacements][Substitute Variables]") {
+  minifi::processors::ReplaceTextTestAccessor replace_text;
+  replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::SUBSTITUTE_VARIABLES);
+
+  const auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+  flow_file->setAttribute("color", "green");
+  flow_file->setAttribute("food", "eggs and ham");
+
+  CHECK(replace_text.applyReplacements("", flow_file) == "");
+  CHECK(replace_text.applyReplacements("no placeholders", flow_file) == "no placeholders");
+  CHECK(replace_text.applyReplacements("${color}", flow_file) == "green");
+  CHECK(replace_text.applyReplacements("I like ${color} ${food}!", flow_file) == "I like green eggs and ham!");
+  CHECK(replace_text.applyReplacements("it was ${color}er than ${color}", flow_file) == "it was greener than green");
+  CHECK(replace_text.applyReplacements("an empty ${} is left alone", flow_file) == "an empty ${} is left alone");
+  CHECK(replace_text.applyReplacements("not ${found} is left alone", flow_file) == "not ${found} is left alone");
+  CHECK(replace_text.applyReplacements("this ${color} ${fruit} is sour", flow_file) == "this green ${fruit} is sour");
+}
+
+TEST_CASE("Regex Replace works correctly in ReplaceText in line by line mode", "[Line-by-Line][Regex Replace]") {
+  TestController testController;
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana\n");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+  plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+  std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), toString(minifi::processors::EvaluationModeType::LINE_BY_LINE));
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::REGEX_REPLACE));
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]");
+  plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "_");
+
+  std::string expected_output;
+  SECTION("Replacing all lines") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::ALL));
+    expected_output = "_ppl_\n p__r\n _r_ng_\n b_n_n_\n";
+  }
+  SECTION("Replacing the first line") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::FIRST_LINE));
+    expected_output = "_ppl_\n pear\n orange\n banana\n";
+  }
+  SECTION("Replacing the last line") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::LAST_LINE));
+    expected_output = "apple\n pear\n orange\n b_n_n_\n";
+  }
+  SECTION("Replacing all lines except the first") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::EXCEPT_FIRST_LINE));
+    expected_output = "apple\n p__r\n _r_ng_\n b_n_n_\n";
+  }
+  SECTION("Replacing all lines except the last") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::EXCEPT_LAST_LINE));
+    expected_output = "_ppl_\n p__r\n _r_ng_\n banana\n";
+  }
+  SECTION("The output has fewer characters than the input") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "");
+    expected_output = "ppl\n pr\n rng\n bnn\n";
+  }
+  SECTION("The output has more characters than the input") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "$&v$&");
+    expected_output = "avappleve\n peveavar\n ovoravangeve\n bavanavanava\n";
+  }
+  SECTION("The start of line anchor works correctly") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "^( ?)[aeiou]");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "$1_");
+    expected_output = "_pple\n pear\n _range\n banana\n";
+  }
+  SECTION("The end of line anchor works correctly") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]$");
+    expected_output = "appl_\n pear\n orang_\n banan_";
+  }
+  SECTION("The end of line anchor works correctly with Windows line endings, too") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\r\n pear\r\n orange\r\n banana");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]$");
+    expected_output = "appl_\r\n pear\r\n orang_\r\n banan_";
+  }
+  SECTION("Prepend works correctly in line by line mode") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\npear\norange\nbanana\n");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::PREPEND));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "- ");
+    expected_output = "- apple\n- pear\n- orange\n- banana\n";
+  }
+  SECTION("Append works correctly in line by line mode") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::APPEND));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), " tree");
+    expected_output = "apple tree\n pear tree\n orange tree\n banana tree\n";
+  }
+  SECTION("Literal Replace works correctly in line by line mode") {
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "a");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "*");
+    expected_output = "*pple\n pe*r\n or*nge\n b*n*n*\n";
+  }
+  SECTION("Always Replace works correctly in line by line mode - without newline") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit");
+    expected_output = "fruit\nfruit\nfruit\nfruit";
+  }
+  SECTION("Always Replace works correctly in line by line mode - with newline") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit\n");
+    expected_output = "fruit\nfruit\nfruit\nfruit";
+  }
+  SECTION("Always Replace works correctly in line by line mode - with Windows line endings") {
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\r\n pear\r\n orange\r\n banana");
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit");
+    expected_output = "fruit\r\nfruit\r\nfruit\r\nfruit";
+  }
+
+  std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+  plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+  testController.runSession(plan);
+
+  CHECK(LogTestController::getInstance().contains(expected_output));
+  LogTestController::getInstance().reset();
+}
+
+class HandleEmptyIncomingFlowFile {
+ public:
+  void setEvaluationMode(minifi::processors::EvaluationModeType evaluation_mode) { evaluation_mode_ = evaluation_mode; }
+  void setReplacementStrategy(minifi::processors::ReplacementStrategyType replacement_strategy) { replacement_strategy_ = replacement_strategy; }
+  void setExpectedOutput(const std::string& expected_output) { expected_output_ = expected_output; }
+
+  void run() {
+    LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+    std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::FileSize.getName(), "0 B");
+
+    std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), evaluation_mode_.toString());
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), replacement_strategy_.toString());
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "hippopotamus");
+
+    std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+    plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+    testController.runSession(plan);
+
+    CHECK(LogTestController::getInstance().contains(expected_output_));
+    LogTestController::getInstance().reset();
+  }
+
+ private:
+  TestController testController;
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  minifi::processors::EvaluationModeType evaluation_mode_;
+  minifi::processors::ReplacementStrategyType replacement_strategy_;
+  std::string expected_output_;
+};
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can prepend to an empty flow file in Entire text mode", "[Entire text][Prepend]") {
+  setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+  setExpectedOutput("Payload:\nhippopotamus\n");
+  run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can append to an empty flow file in Entire text mode", "[Entire text][Append]") {
+  setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+  setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+  setExpectedOutput("Payload:\nhippopotamus\n");
+  run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can prepend to an empty flow file in Line-by-line mode", "[Line-by-Line][Prepend]") {
+  setEvaluationMode(minifi::processors::EvaluationModeType::LINE_BY_LINE);
+  setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+  setExpectedOutput("Size:0 Offset:0");
+  run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can append to an empty flow file in Line-by-line mode", "[Line-by-line][Append]") {
+  setEvaluationMode(minifi::processors::EvaluationModeType::LINE_BY_LINE);
+  setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+  setExpectedOutput("Size:0 Offset:0");
+  run();
+}
+
+class UseExpressionLanguage {
+ public:
+  void setSearchValue(const std::string& search_value) { search_value_ = search_value; }
+  void setReplacementValue(const std::string& replacement_value) { replacement_value_ = replacement_value; }
+  void setExpectedOutput(const std::string& expected_output) { expected_output_ = expected_output; }
+
+  void run() {
+    LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+    std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana\n");
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+    plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+    std::shared_ptr<core::Processor> update_attribute = plan->addProcessor("UpdateAttribute", "update_attribute", minifi::processors::GenerateFlowFile::Success, true);
+    plan->setProperty(update_attribute, "substring", "an", true);
+    plan->setProperty(update_attribute, "color", "blue", true);
+
+    std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), toString(minifi::processors::EvaluationModeType::ENTIRE_TEXT));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE));
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), search_value_);
+    plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), replacement_value_);
+
+    std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+    plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+    testController.runSession(plan);
+
+    CHECK(LogTestController::getInstance().contains(expected_output_));
+    LogTestController::getInstance().reset();
+  }
+
+ private:
+  TestController testController;
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::string search_value_;
+  std::string replacement_value_;
+  std::string expected_output_;
+};
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in the Search Value", "[expression language][Search Value]") {
+  setSearchValue("${substring}");
+  setReplacementValue("*");
+  setExpectedOutput("Payload:\napple\n pear\n or*ge\n b**a\n");
+  run();
+}
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in the Replacement Value", "[expression language][Replacement Value]") {
+  setSearchValue("orange");
+  setReplacementValue("${color}berry");
+  setExpectedOutput("Payload:\napple\n pear\n blueberry\n banana\n");
+  run();
+}
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in both the Search and Replacement Values", "[expression language][Search Value][Replacement Value]") {
+  setSearchValue("${substring}");
+  setReplacementValue("${literal(2):plus(3)}");
+  setExpectedOutput("Payload:\napple\n pear\n or5ge\n b55a\n");
+  run();
+}
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 5511790..790f320 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -97,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
   int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+  // Read and write the flow file at the same time (eg. for processing it line by line)
+  int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback);
   // Replace content with buffer
   void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
   // Execute the given write/append callback against the content
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b92b993..0956f8d 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};
 
 namespace internal {
 
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
new file mode 100644
index 0000000..7cbdcc1
--- /dev/null
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "core/logging/Logger.h"
+#include "io/BaseStream.h"
+#include "io/StreamPipe.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class LineByLineInputOutputStreamCallback : public InputOutputStreamCallback {
+ public:
+  using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
+  explicit LineByLineInputOutputStreamCallback(CallbackType callback);
+  int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) override;
+
+ private:
+  int64_t readInput(io::InputStream& stream);
+  void readLine();
+  [[nodiscard]] bool isLastLine() const { return !next_line_.has_value(); }
+
+  CallbackType callback_;
+  std::vector<uint8_t> input_;
+  std::vector<uint8_t>::iterator current_pos_{};
+  std::optional<std::string> current_line_;
+  std::optional<std::string> next_line_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 0df9541..5f0ebb0 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -76,7 +76,12 @@ class StringUtils {
 
   static std::string toLower(std::string_view str);
 
-  // Trim String utils
+  /**
+   * Strips the line ending (\n or \r\n) from the end of the input line.
+   * @param input_line
+   * @return (stripped line, line ending) pair
+   */
+  static std::pair<std::string, std::string> chomp(const std::string& input_line);
 
   /**
    * Trims a string left to right
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dcb95f4..13cb07b 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -328,6 +328,53 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
   }
 }
 
+int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback) {
+  gsl_Expects(callback);
+
+  try {
+    if (flow->getResourceClaim() == nullptr) {
+      logger_->log_debug("For %s, no resource claim but size is %d", flow->getUUIDStr(), flow->getSize());
+      if (flow->getSize() == 0) {
+        return 0;
+      }
+      throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
+    }
+
+    std::shared_ptr<ResourceClaim> input_claim = flow->getResourceClaim();
+    std::shared_ptr<io::BaseStream> input_stream = content_session_->read(input_claim);
+    if (!input_stream) {
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
+    }
+    input_stream->seek(flow->getOffset());
+
+    std::shared_ptr<ResourceClaim> output_claim = content_session_->create();
+    std::shared_ptr<io::BaseStream> output_stream = content_session_->write(output_claim);
+    if (!output_stream) {
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
+    }
+
+    int64_t bytes_written = callback->process(input_stream, output_stream);
+    if (bytes_written < 0) {
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
+    }
+
+    input_stream->close();
+    output_stream->close();
+
+    flow->setSize(gsl::narrow<uint64_t>(bytes_written));
+    flow->setOffset(0);
+    flow->setResourceClaim(output_claim);
+
+    return bytes_written;
+  } catch (const std::exception& exception) {
+    logger_->log_debug("Caught exception %s during process session readWrite", exception.what());
+    throw;
+  } catch (...) {
+    logger_->log_debug("Caught unknown exception during process session readWrite");
+    throw;
+  }
+}
+
 void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) {
   importFrom(stream, flow);
 }
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
new file mode 100644
index 0000000..1e84247
--- /dev/null
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(CallbackType callback)
+  : callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  if (int64_t status = readInput(*input); status <= 0) {
+    return status;
+  }
+
+  std::size_t total_bytes_written_ = 0;
+  bool is_first_line = true;
+  readLine();
+  do {
+    readLine();
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    const auto bytes_written = output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    if (io::isError(bytes_written)) { return -1; }
+    total_bytes_written_ += bytes_written;
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_written_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) {
+  const auto status = stream.read(input_, stream.size());
+  if (io::isError(status)) { return -1; }
+  current_pos_ = input_.begin();
+  return gsl::narrow<int64_t>(input_.size());
+}
+
+void LineByLineInputOutputStreamCallback::readLine() {
+  if (current_pos_ == input_.end()) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return;
+  }
+
+  auto end_of_line = std::find(current_pos_, input_.end(), '\n');
+  if (end_of_line != input_.end()) { ++end_of_line; }
+
+  current_line_ = next_line_;
+  next_line_ = std::string(current_pos_, end_of_line);
+  current_pos_ = end_of_line;
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index 54ae0db..307eb02 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -48,6 +48,16 @@ std::string StringUtils::toLower(std::string_view str) {
   return str | views::transform(tolower) | ranges::to<std::string>();
 }
 
+std::pair<std::string, std::string> StringUtils::chomp(const std::string& input_line) {
+  if (endsWith(input_line, "\r\n")) {
+    return std::make_pair(input_line.substr(0, input_line.size() - 2), "\r\n");
+  } else if (endsWith(input_line, "\n")) {
+    return std::make_pair(input_line.substr(0, input_line.size() - 1), "\n");
+  } else {
+    return std::make_pair(input_line, "");
+  }
+}
+
 std::string StringUtils::trim(const std::string& s) {
   return trimRight(trimLeft(s));
 }
diff --git a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
new file mode 100644
index 0000000..ff9cd17
--- /dev/null
+++ b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "LineByLineInputOutputStreamCallback.h"
+
+#include "../TestBase.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "io/BufferStream.h"
+#include "spdlog/spdlog.h"
+
+using minifi::utils::LineByLineInputOutputStreamCallback;
+
+TEST_CASE("LineByLineInputOutputStreamCallback can process a stream line by line", "[process]") {
+  const auto input_data = "One two, buckle my shoe\n"
+                          "Three four, knock at the door\n"
+                          "Five six, picking up sticks\n";
+  const auto input_stream = std::make_shared<minifi::io::BufferStream>(input_data);
+  const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+
+  LineByLineInputOutputStreamCallback::CallbackType line_processor;
+  std::string expected_output;
+
+  SECTION("no changes") {
+    line_processor = [](const std::string& input_line, bool, bool) {
+      return input_line;
+    };
+    expected_output = input_data;
+  }
+  SECTION("prepend asterisk") {
+    line_processor = [](const std::string& input_line, bool, bool) {
+      return "* " + input_line;
+    };
+    expected_output = "* One two, buckle my shoe\n"
+                      "* Three four, knock at the door\n"
+                      "* Five six, picking up sticks\n";
+  }
+  SECTION("replace vowels with underscores") {
+    line_processor = [](const std::string& input_line, bool, bool) {
+      return std::regex_replace(input_line, std::regex{"[aeiou]", std::regex::icase}, "_");
+    };
+    expected_output = "_n_ tw_, b_ckl_ my sh__\n"
+                      "Thr__ f__r, kn_ck _t th_ d__r\n"
+                      "F_v_ s_x, p_ck_ng _p st_cks\n";
+  }
+  SECTION("enclose input in square brackets") {
+    line_processor = [](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      if (!is_first_line && !is_last_line) { return input_line; }
+      auto output_line = input_line;
+      if (is_first_line) { output_line = "[ " + output_line; }
+      if (is_last_line) { output_line = output_line + " ]"; }
+      return output_line;
+    };
+    expected_output = "[ One two, buckle my shoe\n"
+                      "Three four, knock at the door\n"
+                      "Five six, picking up sticks\n ]";
+  }
+
+  LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  std::string output_data(reinterpret_cast<const char*>(output_stream->getBuffer()), output_stream->size());
+  CHECK(output_data == expected_output);
+}
+
+TEST_CASE("LineByLineInputOutputStreamCallback can handle Windows line endings", "[process][Windows]") {
+  const auto input_data = "One two, buckle my shoe\r\n"
+                          "Three four, knock at the door\r\n"
+                          "Five six, picking up sticks\r\n";
+  const auto input_stream = std::make_shared<minifi::io::BufferStream>(input_data);
+  const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+
+  const auto line_processor = [](const std::string& input_line, bool, bool) {
+    static int line_number = 0;
+    return fmt::format("{0}: {1}", ++line_number, input_line);
+  };
+  const auto expected_output = "1: One two, buckle my shoe\r\n"
+                               "2: Three four, knock at the door\r\n"
+                               "3: Five six, picking up sticks\r\n";
+
+  LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  std::string output_data(reinterpret_cast<const char*>(output_stream->getBuffer()), output_stream->size());
+  CHECK(output_data == expected_output);
+}
+
+TEST_CASE("LineByLineInputOutputStreamCallback can handle an empty input", "[process][empty]") {
+  const auto input_stream = std::make_shared<minifi::io::BufferStream>("");
+  const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+  const auto line_processor = [](const std::string& input_line, bool, bool) { return input_line; };
+  LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  CHECK(output_stream->size() == 0);
+}
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 303c21f..b2dd86a 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -31,6 +31,14 @@
 
 using org::apache::nifi::minifi::utils::StringUtils;
 
+TEST_CASE("StringUtils::chomp works correctly", "[StringUtils][chomp]") {
+  using pair_of = std::pair<std::string, std::string>;
+  CHECK(StringUtils::chomp("foobar") == pair_of{"foobar", ""});
+  CHECK(StringUtils::chomp("foobar\n") == pair_of{"foobar", "\n"});
+  CHECK(StringUtils::chomp("foobar\r\n") == pair_of{"foobar", "\r\n"});
+  CHECK(StringUtils::chomp("foo\rbar\n") == pair_of{"foo\rbar", "\n"});
+}
+
 TEST_CASE("TestStringUtils::split", "[test split no delimiter]") {
   std::vector<std::string> expected = { "hello" };
   REQUIRE(expected == StringUtils::split("hello", ","));