You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/11/03 14:32:32 UTC
[nifi-minifi-cpp] 02/02: MINIFICPP-1618 Create the ReplaceText
processor
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 190667151763b5d743953584cfcacff874c603d9
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Mon Sep 20 10:35:10 2021 +0200
MINIFICPP-1618 Create the ReplaceText processor
- add chomp() function to StringUtils
- implement LineByLineInputOutputStreamCallback for reading a flow file line by line
Closes #1170
Signed-off-by: Marton Szasz <sz...@apache.org>
---
PROCESSORS.md | 24 ++
README.md | 2 +-
.../test/integration/features/replace_text.feature | 49 +++
.../integration/minifi/processors/ReplaceText.py | 10 +
.../standard-processors/processors/ReplaceText.cpp | 339 +++++++++++++++++
.../standard-processors/processors/ReplaceText.h | 99 +++++
.../tests/unit/ReplaceTextTests.cpp | 408 +++++++++++++++++++++
libminifi/include/core/ProcessSession.h | 2 +
libminifi/include/io/StreamPipe.h | 5 +
.../utils/LineByLineInputOutputStreamCallback.h | 49 +++
libminifi/include/utils/StringUtils.h | 7 +-
libminifi/src/core/ProcessSession.cpp | 47 +++
.../utils/LineByLineInputOutputStreamCallback.cpp | 73 ++++
libminifi/src/utils/StringUtils.cpp | 10 +
.../LineByLineInputOutputStreamCallbackTests.cpp | 105 ++++++
libminifi/test/unit/StringUtilsTests.cpp | 8 +
16 files changed, 1235 insertions(+), 2 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index f7ea514..8c85ec9 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -50,6 +50,7 @@
- [PutSFTP](#putsftp)
- [PutSQL](#putsql)
- [QueryDatabaseTable](#querydatabasetable)
+- [ReplaceText](#replacetext)
- [RetryFlowFile](#retryflowfile)
- [RouteOnAttribute](#routeonattribute)
- [RouteText](#routetext)
@@ -1487,6 +1488,29 @@ In the list below, the names of required properties appear in bold. Any other pr
|initial.maxvalue.<max_value_column>|Initial maximum value for the specified column|Specifies an initial max value for max value column(s). Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).<br/>**Supports Expression Language: true**|
+## ReplaceText
+
+### Description
+Updates the content of a FlowFile by replacing parts of it using various replacement strategies.
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Evaluation Mode**|Line-by-Line|Entire text<br>Line-by-Line<br>|Run the 'Replacement Strategy' against each line separately (Line-by-Line) or against the whole input treated as a single string (Entire Text).|
+|Line-by-Line Evaluation Mode|All|All<br>Except-First-Line<br>Except-Last-Line<br>First-Line<br>Last-Line<br>|Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).|
+|**Replacement Strategy**|Regex Replace|Always Replace<br>Append<br>Literal Replace<br>Prepend<br>Regex Replace<br>Substitute Variables<br>|The strategy for how and what to replace within the FlowFile's text content. Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value (if an attribute is not found, the placeholder is kept as it was).|
+|**Replacement Value**|||The value to insert using the 'Replacement Strategy'. Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: $& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. Back-references to non-existent capturing groups will be replaced by empty strings. Supports expression language except in Regex Replace mode.<br/>**Supports Expression Language: true**|
+|Search Value|||The Search Value to search for in the FlowFile content. Only used for 'Literal Replace' and 'Regex Replace' matching strategies. Supports expression language except in Regex Replace mode.<br/>**Supports Expression Language: true**|
+
+### Relationships
+| Name | Description |
+| - | - |
+|failure|FlowFiles that could not be updated are routed to this relationship.|
+|success|FlowFiles that have been successfully processed are routed to this relationship. This includes both FlowFiles that had text replaced and those that did not.|
+
+
## RetryFlowFile
### Description
diff --git a/README.md b/README.md
index 2ba9e72..58322e2 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
| Extension Set | Processors |
| ------------- |:-------------|
-| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
+| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/> [GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P [...]
The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line.
diff --git a/docker/test/integration/features/replace_text.feature b/docker/test/integration/features/replace_text.feature
new file mode 100644
index 0000000..e7f316b
--- /dev/null
+++ b/docker/test/integration/features/replace_text.feature
@@ -0,0 +1,49 @@
+Feature: Changing flowfile contents using the ReplaceText processor
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario Outline: Replace text using Entire text mode
+ Given a GenerateFlowFile processor with the "Custom Text" property set to "<input>"
+ And the scheduling period of the GenerateFlowFile processor is set to "1 hour"
+ And the "Data Format" property of the GenerateFlowFile processor is set to "Text"
+ And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
+ And a ReplaceText processor with the "Evaluation Mode" property set to "Entire text"
+ And the "Replacement Strategy" property of the ReplaceText processor is set to "<replacement_strategy>"
+ And the "Search Value" property of the ReplaceText processor is set to "<search_value>"
+ And the "Replacement Value" property of the ReplaceText processor is set to "<replacement_value>"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the GenerateFlowFile processor is connected to the ReplaceText
+ And the "success" relationship of the ReplaceText processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then a flowfile with the content "<output>" is placed in the monitored directory in less than 10 seconds
+
+ Examples:
+ | input | replacement_strategy | search_value | replacement_value | output |
+ | apple | Prepend | _ | pine | pineapple |
+ | apple | Append | _ | sauce | applesauce |
+ | one apple, two apples | Regex Replace | a([a-z]+)e | ri$1et | one ripplet, two ripplets |
+ | one apple, two apples | Literal Replace | apple | banana | one banana, two bananas |
+ | one apple, two apples | Always Replace | _ | banana | banana |
+
+ Scenario Outline: Replace text using Line-by-Line mode
+ Given a file with the content "<input>" is present in "/tmp/input"
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+ And a ReplaceText processor with the "Evaluation Mode" property set to "Line-by-Line"
+ And the "Line-by-Line Evaluation Mode" property of the ReplaceText processor is set to "<evaluation_mode>"
+ And the "Replacement Strategy" property of the ReplaceText processor is set to "Regex Replace"
+ And the "Search Value" property of the ReplaceText processor is set to "a+(b+)c+"
+ And the "Replacement Value" property of the ReplaceText processor is set to "_$1_"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the GetFile processor is connected to the ReplaceText
+ And the "success" relationship of the ReplaceText processor is connected to the PutFile
+ When the MiNiFi instance starts up
+ Then a flowfile with the content "<output>" is placed in the monitored directory in less than 10 seconds
+
+ Examples:
+ | input | evaluation_mode | output |
+ | abc\n aabbcc\n aaabbbccc\n | All | _b_\n _bb_\n _bbb_\n |
+ | abc\n aabbcc\n aaabbbccc | All | _b_\n _bb_\n _bbb_ |
+ | abc\n aabbcc\n aaabbbccc\n | First-Line | _b_\n aabbcc\n aaabbbccc\n |
+ | abc\n aabbcc\n aaabbbccc\n | Last-Line | abc\n aabbcc\n _bbb_\n |
+ | abc\n aabbcc\n aaabbbccc\n | Except-First-Line | abc\n _bb_\n _bbb_\n |
+ | abc\n aabbcc\n aaabbbccc\n | Except-Last-Line | _b_\n _bb_\n aaabbbccc\n |
diff --git a/docker/test/integration/minifi/processors/ReplaceText.py b/docker/test/integration/minifi/processors/ReplaceText.py
new file mode 100644
index 0000000..4612f33
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ReplaceText.py
@@ -0,0 +1,10 @@
+from ..core.Processor import Processor
+
+
+class ReplaceText(Processor):
+ def __init__(self):
+ super(ReplaceText, self).__init__(
+ 'ReplaceText',
+ properties={},
+ schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+ auto_terminate=['success', 'failure'])
diff --git a/extensions/standard-processors/processors/ReplaceText.cpp b/extensions/standard-processors/processors/ReplaceText.cpp
new file mode 100644
index 0000000..5bd4ec3
--- /dev/null
+++ b/extensions/standard-processors/processors/ReplaceText.cpp
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+ "against the whole input treated as a single string (Entire Text).")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+ ->withAllowableValues(EvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+ "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+ ->isRequired(false)
+ ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+ ->withAllowableValues(LineByLineEvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+ ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+ "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+ "(if an attribute is not found, the placeholder is kept as it was).")
+ ->isRequired(true)
+ ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+ ->withAllowableValues(ReplacementStrategyType::values())
+ ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+ ->withDescription("The Search Value to search for in the FlowFile content. "
+ "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+ "Supports expression language except in Regex Replace mode.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+ ->withDescription("The value to insert using the 'Replacement Strategy'. "
+ "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+ "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+ "Back-references to non-existent capturing groups will be replaced by empty strings. "
+ "Supports expression language except in Regex Replace mode.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+ "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+ : core::Processor(name, uuid),
+ logger_(core::logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+ setSupportedProperties({
+ EvaluationMode,
+ LineByLineEvaluationMode,
+ ReplacementStrategy,
+ SearchValue,
+ ReplacementValue
+ });
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+ gsl_Expects(context);
+
+ const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+ evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+ logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+ const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+ if (line_by_line_evaluation_mode) {
+ line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+ logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+ }
+
+ const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+ replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+ logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context);
+ gsl_Expects(session);
+
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ logger_->log_trace("No flow file");
+ yield();
+ return;
+ }
+
+ Parameters parameters = readParameters(context, flow_file);
+
+ switch (evaluation_mode_.value()) {
+ case EvaluationModeType::ENTIRE_TEXT:
+ replaceTextInEntireFile(flow_file, session, parameters);
+ return;
+ case EvaluationModeType::LINE_BY_LINE:
+ replaceTextLineByLine(flow_file, session, parameters);
+ return;
+ }
+
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+ Parameters parameters;
+
+ bool found_search_value;
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+ } else {
+ found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+ }
+ if (found_search_value) {
+ logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ parameters.search_regex_ = std::regex{parameters.search_value_};
+ }
+ }
+ if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+ }
+
+ bool found_replacement_value;
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+ } else {
+ found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+ }
+ if (found_replacement_value) {
+ logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+ } else {
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+ }
+
+ if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+ auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+ parameters.replacement_value_ = std::move(chomped_value);
+ }
+
+ return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+ std::vector<uint8_t> buffer_;
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ size_t bytes_read = stream->read(buffer_, stream->size());
+ return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+ }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+ const std::vector<uint8_t>& buffer_;
+
+ explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ size_t bytes_written = stream->write(buffer_, buffer_.size());
+ return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+ }
+};
+
+} // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+ gsl_Expects(flow_file);
+ gsl_Expects(session);
+
+ try {
+ ReadFlowFileIntoBuffer read_callback;
+ session->read(flow_file, &read_callback);
+
+ std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+ std::string output = applyReplacements(input, flow_file, parameters);
+ std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+ WriteBufferToFlowFile write_callback{modified_text};
+ session->write(flow_file, &write_callback);
+
+ session->transfer(flow_file, Success);
+ } catch (const Exception& exception) {
+ logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+ gsl_Expects(flow_file);
+ gsl_Expects(session);
+
+ try {
+ utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, ¶meters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+ switch (line_by_line_evaluation_mode_.value()) {
+ case LineByLineEvaluationModeType::ALL:
+ return applyReplacements(input_line, flow_file, parameters);
+ case LineByLineEvaluationModeType::FIRST_LINE:
+ return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+ case LineByLineEvaluationModeType::LAST_LINE:
+ return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+ case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+ return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+ case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+ return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+ }
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+ }};
+ session->readWrite(flow_file, &read_write_callback);
+ session->transfer(flow_file, Success);
+ } catch (const Exception& exception) {
+ logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+ const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+ switch (replacement_strategy_.value()) {
+ case ReplacementStrategyType::PREPEND:
+ return parameters.replacement_value_ + input;
+
+ case ReplacementStrategyType::APPEND:
+ return chomped_input + parameters.replacement_value_ + line_ending;
+
+ case ReplacementStrategyType::REGEX_REPLACE:
+ return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+ case ReplacementStrategyType::LITERAL_REPLACE:
+ return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+ case ReplacementStrategyType::ALWAYS_REPLACE:
+ return parameters.replacement_value_ + line_ending;
+
+ case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+ return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+ }
+
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) {
+ std::vector<char> output;
+ output.reserve(input.size());
+
+ auto it = input.begin();
+ do {
+ auto found = std::search(it, input.end(), parameters.search_value_.begin(), parameters.search_value_.end());
+ if (found != input.end()) {
+ std::copy(it, found, std::back_inserter(output));
+ std::copy(parameters.replacement_value_.begin(), parameters.replacement_value_.end(), std::back_inserter(output));
+ it = found;
+ std::advance(it, parameters.search_value_.size());
+ } else {
+ std::copy(it, input.end(), std::back_inserter(output));
+ it = input.end();
+ }
+ } while (it != input.end());
+
+ return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+ static const std::regex PLACEHOLDER{R"(\$\{([^}]+)\})"};
+
+ auto input_it = std::sregex_iterator{input.begin(), input.end(), PLACEHOLDER};
+ const auto input_end = std::sregex_iterator{};
+ if (input_it == input_end) {
+ return input;
+ }
+
+ std::vector<char> output;
+ auto output_it = std::back_inserter(output);
+
+ std::smatch match;
+ for (; input_it != input_end; ++input_it) {
+ match = *input_it;
+ output_it = std::copy(match.prefix().first, match.prefix().second, output_it);
+ std::string attribute_value = getAttributeValue(flow_file, match);
+ output_it = std::copy(attribute_value.begin(), attribute_value.end(), output_it);
+ }
+ std::copy(match.suffix().first, match.suffix().second, output_it);
+
+ return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const {
+ gsl_Expects(flow_file);
+ gsl_Expects(match.size() >= 2);
+
+ std::string attribute_key = match[1];
+ std::optional<std::string> attribute_value = flow_file->getAttribute(attribute_key);
+ if (attribute_value) {
+ return *attribute_value;
+ } else {
+ logger_->log_debug("Attribute %s not found in the flow file during %s", attribute_key, toString(ReplacementStrategyType::SUBSTITUTE_VARIABLES));
+ return match[0];
+ }
+}
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies.");
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/ReplaceText.h b/extensions/standard-processors/processors/ReplaceText.h
new file mode 100644
index 0000000..316c28f
--- /dev/null
+++ b/extensions/standard-processors/processors/ReplaceText.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+ (LINE_BY_LINE, "Line-by-Line"),
+ (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+ (ALL, "All"),
+ (FIRST_LINE, "First-Line"),
+ (LAST_LINE, "Last-Line"),
+ (EXCEPT_FIRST_LINE, "Except-First-Line"),
+ (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+ (PREPEND, "Prepend"),
+ (APPEND, "Append"),
+ (REGEX_REPLACE, "Regex Replace"),
+ (LITERAL_REPLACE, "Literal Replace"),
+ (ALWAYS_REPLACE, "Always Replace"),
+ (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+ EXTENSIONAPI static const core::Property EvaluationMode;
+ EXTENSIONAPI static const core::Property LineByLineEvaluationMode;
+ EXTENSIONAPI static const core::Property ReplacementStrategy;
+ EXTENSIONAPI static const core::Property SearchValue;
+ EXTENSIONAPI static const core::Property ReplacementValue;
+
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {});
+ core::annotation::Input getInputRequirement() const override { return core::annotation::Input::INPUT_REQUIRED; }
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+ friend struct ReplaceTextTestAccessor;
+
+ struct Parameters {
+ std::string search_value_;
+ std::regex search_regex_;
+ std::string replacement_value_;
+ };
+
+ Parameters readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
+ void replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const;
+ void replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const;
+
+ std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const;
+ static std::string applyLiteralReplace(const std::string& input, const Parameters& parameters);
+ std::string applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+ std::string getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const;
+
+ EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+ LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL;
+ ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE;
+ std::shared_ptr<core::logging::Logger> logger_;
+};
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp b/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp
new file mode 100644
index 0000000..4c35384
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ReplaceTextTests.cpp
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GenerateFlowFile.h"
+#include "LogAttribute.h"
+#include "ReplaceText.h"
+#include "TestBase.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+struct ReplaceTextTestAccessor {
+ ReplaceText processor_;
+ ReplaceText::Parameters parameters_;
+
+ ReplaceTextTestAccessor() : processor_{"replace_text"} {}
+
+ void setEvaluationMode(EvaluationModeType evaluation_mode) { processor_.evaluation_mode_ = evaluation_mode; }
+ void setReplacementStrategy(ReplacementStrategyType replacement_strategy) { processor_.replacement_strategy_ = replacement_strategy; }
+ void setSearchValue(const std::string& search_value) { parameters_.search_value_ = search_value; }
+ void setSearchRegex(const std::string& search_regex) { parameters_.search_regex_ = std::regex{search_regex}; }
+ void setReplacementValue(const std::string& replacement_value) { parameters_.replacement_value_ = replacement_value; }
+
+ std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file = {}) const { return processor_.applyReplacements(input, flow_file, parameters_); }
+};
+
+} // namespace org::apache::nifi::minifi::processors
+
+TEST_CASE("ReplaceText can parse its properties", "[onSchedule]") {
+ TestController testController;
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ LogTestController::getInstance().setDebug<minifi::processors::ReplaceText>();
+
+ std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "One green bottle is hanging on the wall");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+ std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), "Entire text");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), "Except-First-Line");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), "Substitute Variables");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "apple");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "orange");
+
+ testController.runSession(plan);
+
+ CHECK(LogTestController::getInstance().contains("the Evaluation Mode property is set to Entire text"));
+ CHECK(LogTestController::getInstance().contains("the Line-by-Line Evaluation Mode property is set to Except-First-Line"));
+ CHECK(LogTestController::getInstance().contains("the Replacement Strategy property is set to Substitute Variables"));
+ CHECK(LogTestController::getInstance().contains("the Search Value property is set to apple"));
+ CHECK(LogTestController::getInstance().contains("the Replacement Value property is set to orange"));
+}
+
+TEST_CASE("Prepend works correctly in ReplaceText", "[applyReplacements][Prepend]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+ replace_text.setReplacementValue("orange");
+
+ CHECK(replace_text.applyReplacements("") == "orange");
+ CHECK(replace_text.applyReplacements("s and lemons") == "oranges and lemons");
+}
+
+TEST_CASE("Append works correctly in ReplaceText", "[applyReplacements][Append]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+ replace_text.setReplacementValue("orange");
+
+ CHECK(replace_text.applyReplacements("") == "orange");
+ CHECK(replace_text.applyReplacements("agent ") == "agent orange");
+}
+
+TEST_CASE("Regex Replace works correctly in ReplaceText", "[applyReplacements][Regex Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+ replace_text.setSearchRegex("a\\w+e");
+ replace_text.setReplacementValue("orange");
+
+ CHECK(replace_text.applyReplacements("") == "");
+ CHECK(replace_text.applyReplacements("apple tree") == "orange tree");
+ CHECK(replace_text.applyReplacements("one apple, two apples") == "one orange, two oranges");
+}
+
+TEST_CASE("Regex Replace works with back references in ReplaceText", "[applyReplacements][Regex Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+ replace_text.setSearchRegex("a(b+)c");
+ replace_text.setReplacementValue("$& [found $1]");
+
+ CHECK(replace_text.applyReplacements("") == "");
+ CHECK(replace_text.applyReplacements("abc") == "abc [found b]");
+ CHECK(replace_text.applyReplacements("cba") == "cba");
+ CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx abc [found b] yyy abbbc [found bbb] zzz");
+}
+
+TEST_CASE("Regex Replace treats non-existent back references as blank in ReplaceText", "[applyReplacements][Regex Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+ replace_text.setSearchRegex("a(b+)c");
+ replace_text.setReplacementValue("_$1_ '$2'");
+
+ CHECK(replace_text.applyReplacements("") == "");
+ CHECK(replace_text.applyReplacements("abc") == "_b_ ''");
+ CHECK(replace_text.applyReplacements("cba") == "cba");
+ CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx _b_ '' yyy _bbb_ '' zzz");
+}
+
+TEST_CASE("Back references can be escaped when using Regex Replace in ReplaceText", "[applyReplacements][Regex Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::REGEX_REPLACE);
+ replace_text.setSearchRegex("a(b+)c");
+ replace_text.setReplacementValue("$1 costs $$2");
+
+ CHECK(replace_text.applyReplacements("") == "");
+ CHECK(replace_text.applyReplacements("abc") == "b costs $2");
+ CHECK(replace_text.applyReplacements("cba") == "cba");
+ CHECK(replace_text.applyReplacements("xxx abc yyy abbbc zzz") == "xxx b costs $2 yyy bbb costs $2 zzz");
+}
+
+TEST_CASE("Literal replace works correctly in ReplaceText", "[applyReplacements][Literal Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE);
+ replace_text.setSearchValue("apple");
+ replace_text.setReplacementValue("orange");
+
+ CHECK(replace_text.applyReplacements("") == "");
+ CHECK(replace_text.applyReplacements("apple tree") == "orange tree");
+ CHECK(replace_text.applyReplacements("one apple, two apples") == "one orange, two oranges");
+}
+
+TEST_CASE("Always Replace works correctly in ReplaceText", "[applyReplacements][Always Replace]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE);
+ replace_text.setReplacementValue("orange");
+
+ CHECK(replace_text.applyReplacements("") == "orange");
+ CHECK(replace_text.applyReplacements("apple tree") == "orange");
+ CHECK(replace_text.applyReplacements("one apple, two apples") == "orange");
+}
+
+TEST_CASE("Substitute Variables works correctly in ReplaceText", "[applyReplacements][Substitute Variables]") {
+ minifi::processors::ReplaceTextTestAccessor replace_text;
+ replace_text.setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ replace_text.setReplacementStrategy(minifi::processors::ReplacementStrategyType::SUBSTITUTE_VARIABLES);
+
+ const auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+ flow_file->setAttribute("color", "green");
+ flow_file->setAttribute("food", "eggs and ham");
+
+ CHECK(replace_text.applyReplacements("", flow_file) == "");
+ CHECK(replace_text.applyReplacements("no placeholders", flow_file) == "no placeholders");
+ CHECK(replace_text.applyReplacements("${color}", flow_file) == "green");
+ CHECK(replace_text.applyReplacements("I like ${color} ${food}!", flow_file) == "I like green eggs and ham!");
+ CHECK(replace_text.applyReplacements("it was ${color}er than ${color}", flow_file) == "it was greener than green");
+ CHECK(replace_text.applyReplacements("an empty ${} is left alone", flow_file) == "an empty ${} is left alone");
+ CHECK(replace_text.applyReplacements("not ${found} is left alone", flow_file) == "not ${found} is left alone");
+ CHECK(replace_text.applyReplacements("this ${color} ${fruit} is sour", flow_file) == "this green ${fruit} is sour");
+}
+
+TEST_CASE("Regex Replace works correctly in ReplaceText in line by line mode", "[Line-by-Line][Regex Replace]") {
+ TestController testController;
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana\n");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+ std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), toString(minifi::processors::EvaluationModeType::LINE_BY_LINE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::REGEX_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "_");
+
+ std::string expected_output;
+ SECTION("Replacing all lines") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::ALL));
+ expected_output = "_ppl_\n p__r\n _r_ng_\n b_n_n_\n";
+ }
+ SECTION("Replacing the first line") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::FIRST_LINE));
+ expected_output = "_ppl_\n pear\n orange\n banana\n";
+ }
+ SECTION("Replacing the last line") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::LAST_LINE));
+ expected_output = "apple\n pear\n orange\n b_n_n_\n";
+ }
+ SECTION("Replacing all lines except the first") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::EXCEPT_FIRST_LINE));
+ expected_output = "apple\n p__r\n _r_ng_\n b_n_n_\n";
+ }
+ SECTION("Replacing all lines except the last") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::LineByLineEvaluationMode.getName(), toString(minifi::processors::LineByLineEvaluationModeType::EXCEPT_LAST_LINE));
+ expected_output = "_ppl_\n p__r\n _r_ng_\n banana\n";
+ }
+ SECTION("The output has fewer characters than the input") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "");
+ expected_output = "ppl\n pr\n rng\n bnn\n";
+ }
+ SECTION("The output has more characters than the input") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "$&v$&");
+ expected_output = "avappleve\n peveavar\n ovoravangeve\n bavanavanava\n";
+ }
+ SECTION("The start of line anchor works correctly") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "^( ?)[aeiou]");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "$1_");
+ expected_output = "_pple\n pear\n _range\n banana\n";
+ }
+ SECTION("The end of line anchor works correctly") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]$");
+ expected_output = "appl_\n pear\n orang_\n banan_";
+ }
+ SECTION("The end of line anchor works correctly with Windows line endings, too") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\r\n pear\r\n orange\r\n banana");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "[aeiou]$");
+ expected_output = "appl_\r\n pear\r\n orang_\r\n banan_";
+ }
+ SECTION("Prepend works correctly in line by line mode") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\npear\norange\nbanana\n");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::PREPEND));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "- ");
+ expected_output = "- apple\n- pear\n- orange\n- banana\n";
+ }
+ SECTION("Append works correctly in line by line mode") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::APPEND));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), " tree");
+ expected_output = "apple tree\n pear tree\n orange tree\n banana tree\n";
+ }
+ SECTION("Literal Replace works correctly in line by line mode") {
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), "a");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "*");
+ expected_output = "*pple\n pe*r\n or*nge\n b*n*n*\n";
+ }
+ SECTION("Always Replace works correctly in line by line mode - without newline") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit");
+ expected_output = "fruit\nfruit\nfruit\nfruit";
+ }
+ SECTION("Always Replace works correctly in line by line mode - with newline") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit\n");
+ expected_output = "fruit\nfruit\nfruit\nfruit";
+ }
+ SECTION("Always Replace works correctly in line by line mode - with Windows line endings") {
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\r\n pear\r\n orange\r\n banana");
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::ALWAYS_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "fruit");
+ expected_output = "fruit\r\nfruit\r\nfruit\r\nfruit";
+ }
+
+ std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+ plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+ testController.runSession(plan);
+
+ CHECK(LogTestController::getInstance().contains(expected_output));
+ LogTestController::getInstance().reset();
+}
+
+class HandleEmptyIncomingFlowFile {
+ public:
+ void setEvaluationMode(minifi::processors::EvaluationModeType evaluation_mode) { evaluation_mode_ = evaluation_mode; }
+ void setReplacementStrategy(minifi::processors::ReplacementStrategyType replacement_strategy) { replacement_strategy_ = replacement_strategy; }
+ void setExpectedOutput(const std::string& expected_output) { expected_output_ = expected_output; }
+
+ void run() {
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::FileSize.getName(), "0 B");
+
+ std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), evaluation_mode_.toString());
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), replacement_strategy_.toString());
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), "hippopotamus");
+
+ std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+ plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+ testController.runSession(plan);
+
+ CHECK(LogTestController::getInstance().contains(expected_output_));
+ LogTestController::getInstance().reset();
+ }
+
+ private:
+ TestController testController;
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ minifi::processors::EvaluationModeType evaluation_mode_;
+ minifi::processors::ReplacementStrategyType replacement_strategy_;
+ std::string expected_output_;
+};
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can prepend to an empty flow file in Entire text mode", "[Entire text][Prepend]") {
+ setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+ setExpectedOutput("Payload:\nhippopotamus\n");
+ run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can append to an empty flow file in Entire text mode", "[Entire text][Append]") {
+ setEvaluationMode(minifi::processors::EvaluationModeType::ENTIRE_TEXT);
+ setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+ setExpectedOutput("Payload:\nhippopotamus\n");
+ run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can prepend to an empty flow file in Line-by-line mode", "[Line-by-Line][Prepend]") {
+ setEvaluationMode(minifi::processors::EvaluationModeType::LINE_BY_LINE);
+ setReplacementStrategy(minifi::processors::ReplacementStrategyType::PREPEND);
+ setExpectedOutput("Size:0 Offset:0");
+ run();
+}
+
+TEST_CASE_METHOD(HandleEmptyIncomingFlowFile, "ReplaceText can append to an empty flow file in Line-by-line mode", "[Line-by-line][Append]") {
+ setEvaluationMode(minifi::processors::EvaluationModeType::LINE_BY_LINE);
+ setReplacementStrategy(minifi::processors::ReplacementStrategyType::APPEND);
+ setExpectedOutput("Size:0 Offset:0");
+ run();
+}
+
+class UseExpressionLanguage {
+ public:
+ void setSearchValue(const std::string& search_value) { search_value_ = search_value; }
+ void setReplacementValue(const std::string& replacement_value) { replacement_value_ = replacement_value; }
+ void setExpectedOutput(const std::string& expected_output) { expected_output_ = expected_output; }
+
+ void run() {
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<core::Processor> generate_flow_file = plan->addProcessor("GenerateFlowFile", "generate_flow_file");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::CustomText.getName(), "apple\n pear\n orange\n banana\n");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::DataFormat.getName(), "Text");
+ plan->setProperty(generate_flow_file, minifi::processors::GenerateFlowFile::UniqueFlowFiles.getName(), "false");
+
+ std::shared_ptr<core::Processor> update_attribute = plan->addProcessor("UpdateAttribute", "update_attribute", minifi::processors::GenerateFlowFile::Success, true);
+ plan->setProperty(update_attribute, "substring", "an", true);
+ plan->setProperty(update_attribute, "color", "blue", true);
+
+ std::shared_ptr<core::Processor> replace_text = plan->addProcessor("ReplaceText", "replace_text", minifi::processors::GenerateFlowFile::Success, true);
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::EvaluationMode.getName(), toString(minifi::processors::EvaluationModeType::ENTIRE_TEXT));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementStrategy.getName(), toString(minifi::processors::ReplacementStrategyType::LITERAL_REPLACE));
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::SearchValue.getName(), search_value_);
+ plan->setProperty(replace_text, minifi::processors::ReplaceText::ReplacementValue.getName(), replacement_value_);
+
+ std::shared_ptr<core::Processor> log_attribute = plan->addProcessor("LogAttribute", "log_attribute", minifi::processors::ReplaceText::Success, true);
+ plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload.getName(), "true");
+
+ testController.runSession(plan);
+
+ CHECK(LogTestController::getInstance().contains(expected_output_));
+ LogTestController::getInstance().reset();
+ }
+
+ private:
+ TestController testController;
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::string search_value_;
+ std::string replacement_value_;
+ std::string expected_output_;
+};
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in the Search Value", "[expression language][Search Value]") {
+ setSearchValue("${substring}");
+ setReplacementValue("*");
+ setExpectedOutput("Payload:\napple\n pear\n or*ge\n b**a\n");
+ run();
+}
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in the Replacement Value", "[expression language][Replacement Value]") {
+ setSearchValue("orange");
+ setReplacementValue("${color}berry");
+ setExpectedOutput("Payload:\napple\n pear\n blueberry\n banana\n");
+ run();
+}
+
+TEST_CASE_METHOD(UseExpressionLanguage, "ReplaceText can use expression language in both the Search and Replacement Values", "[expression language][Search Value][Replacement Value]") {
+ setSearchValue("${substring}");
+ setReplacementValue("${literal(2):plus(3)}");
+ setExpectedOutput("Payload:\napple\n pear\n or5ge\n b55a\n");
+ run();
+}
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 5511790..790f320 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -97,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
// Execute the given write callback against the content
void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+ // Read and write the flow file at the same time (eg. for processing it line by line)
+ int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback);
// Replace content with buffer
void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
// Execute the given write/append callback against the content
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b92b993..0956f8d 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -42,6 +42,11 @@ class OutputStreamCallback {
virtual ~OutputStreamCallback() = default;
virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
};
+class InputOutputStreamCallback {
+ public:
+ virtual ~InputOutputStreamCallback() = default;
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};
namespace internal {
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
new file mode 100644
index 0000000..7cbdcc1
--- /dev/null
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "core/logging/Logger.h"
+#include "io/BaseStream.h"
+#include "io/StreamPipe.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class LineByLineInputOutputStreamCallback : public InputOutputStreamCallback {
+ public:
+ using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
+ explicit LineByLineInputOutputStreamCallback(CallbackType callback);
+ int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) override;
+
+ private:
+ int64_t readInput(io::InputStream& stream);
+ void readLine();
+ [[nodiscard]] bool isLastLine() const { return !next_line_.has_value(); }
+
+ CallbackType callback_;
+ std::vector<uint8_t> input_;
+ std::vector<uint8_t>::iterator current_pos_{};
+ std::optional<std::string> current_line_;
+ std::optional<std::string> next_line_;
+};
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 0df9541..5f0ebb0 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -76,7 +76,12 @@ class StringUtils {
static std::string toLower(std::string_view str);
- // Trim String utils
+ /**
+ * Strips the line ending (\n or \r\n) from the end of the input line.
+ * @param input_line
+ * @return (stripped line, line ending) pair
+ */
+ static std::pair<std::string, std::string> chomp(const std::string& input_line);
/**
* Trims a string left to right
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dcb95f4..13cb07b 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -328,6 +328,53 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
}
}
+int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback) {
+ gsl_Expects(callback);
+
+ try {
+ if (flow->getResourceClaim() == nullptr) {
+ logger_->log_debug("For %s, no resource claim but size is %d", flow->getUUIDStr(), flow->getSize());
+ if (flow->getSize() == 0) {
+ return 0;
+ }
+ throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
+ }
+
+ std::shared_ptr<ResourceClaim> input_claim = flow->getResourceClaim();
+ std::shared_ptr<io::BaseStream> input_stream = content_session_->read(input_claim);
+ if (!input_stream) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
+ }
+ input_stream->seek(flow->getOffset());
+
+ std::shared_ptr<ResourceClaim> output_claim = content_session_->create();
+ std::shared_ptr<io::BaseStream> output_stream = content_session_->write(output_claim);
+ if (!output_stream) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
+ }
+
+ int64_t bytes_written = callback->process(input_stream, output_stream);
+ if (bytes_written < 0) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
+ }
+
+ input_stream->close();
+ output_stream->close();
+
+ flow->setSize(gsl::narrow<uint64_t>(bytes_written));
+ flow->setOffset(0);
+ flow->setResourceClaim(output_claim);
+
+ return bytes_written;
+ } catch (const std::exception& exception) {
+ logger_->log_debug("Caught exception %s during process session readWrite", exception.what());
+ throw;
+ } catch (...) {
+ logger_->log_debug("Caught unknown exception during process session readWrite");
+ throw;
+ }
+}
+
void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) {
importFrom(stream, flow);
}
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
new file mode 100644
index 0000000..1e84247
--- /dev/null
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(CallbackType callback)
+ : callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+ gsl_Expects(input);
+ gsl_Expects(output);
+
+ if (int64_t status = readInput(*input); status <= 0) {
+ return status;
+ }
+
+ std::size_t total_bytes_written_ = 0;
+ bool is_first_line = true;
+ readLine();
+ do {
+ readLine();
+ std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+ const auto bytes_written = output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+ if (io::isError(bytes_written)) { return -1; }
+ total_bytes_written_ += bytes_written;
+ is_first_line = false;
+ } while (!isLastLine());
+
+ return gsl::narrow<int64_t>(total_bytes_written_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) {
+ const auto status = stream.read(input_, stream.size());
+ if (io::isError(status)) { return -1; }
+ current_pos_ = input_.begin();
+ return gsl::narrow<int64_t>(input_.size());
+}
+
+void LineByLineInputOutputStreamCallback::readLine() {
+ if (current_pos_ == input_.end()) {
+ current_line_ = next_line_;
+ next_line_ = std::nullopt;
+ return;
+ }
+
+ auto end_of_line = std::find(current_pos_, input_.end(), '\n');
+ if (end_of_line != input_.end()) { ++end_of_line; }
+
+ current_line_ = next_line_;
+ next_line_ = std::string(current_pos_, end_of_line);
+ current_pos_ = end_of_line;
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index 54ae0db..307eb02 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -48,6 +48,16 @@ std::string StringUtils::toLower(std::string_view str) {
return str | views::transform(tolower) | ranges::to<std::string>();
}
+std::pair<std::string, std::string> StringUtils::chomp(const std::string& input_line) {
+ if (endsWith(input_line, "\r\n")) {
+ return std::make_pair(input_line.substr(0, input_line.size() - 2), "\r\n");
+ } else if (endsWith(input_line, "\n")) {
+ return std::make_pair(input_line.substr(0, input_line.size() - 1), "\n");
+ } else {
+ return std::make_pair(input_line, "");
+ }
+}
+
std::string StringUtils::trim(const std::string& s) {
return trimRight(trimLeft(s));
}
diff --git a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
new file mode 100644
index 0000000..ff9cd17
--- /dev/null
+++ b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "LineByLineInputOutputStreamCallback.h"
+
+#include "../TestBase.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "io/BufferStream.h"
+#include "spdlog/spdlog.h"
+
+using minifi::utils::LineByLineInputOutputStreamCallback;
+
+TEST_CASE("LineByLineInputOutputStreamCallback can process a stream line by line", "[process]") {
+ const auto input_data = "One two, buckle my shoe\n"
+ "Three four, knock at the door\n"
+ "Five six, picking up sticks\n";
+ const auto input_stream = std::make_shared<minifi::io::BufferStream>(input_data);
+ const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+
+ LineByLineInputOutputStreamCallback::CallbackType line_processor;
+ std::string expected_output;
+
+ SECTION("no changes") {
+ line_processor = [](const std::string& input_line, bool, bool) {
+ return input_line;
+ };
+ expected_output = input_data;
+ }
+ SECTION("prepend asterisk") {
+ line_processor = [](const std::string& input_line, bool, bool) {
+ return "* " + input_line;
+ };
+ expected_output = "* One two, buckle my shoe\n"
+ "* Three four, knock at the door\n"
+ "* Five six, picking up sticks\n";
+ }
+ SECTION("replace vowels with underscores") {
+ line_processor = [](const std::string& input_line, bool, bool) {
+ return std::regex_replace(input_line, std::regex{"[aeiou]", std::regex::icase}, "_");
+ };
+ expected_output = "_n_ tw_, b_ckl_ my sh__\n"
+ "Thr__ f__r, kn_ck _t th_ d__r\n"
+ "F_v_ s_x, p_ck_ng _p st_cks\n";
+ }
+ SECTION("enclose input in square brackets") {
+ line_processor = [](const std::string& input_line, bool is_first_line, bool is_last_line) {
+ if (!is_first_line && !is_last_line) { return input_line; }
+ auto output_line = input_line;
+ if (is_first_line) { output_line = "[ " + output_line; }
+ if (is_last_line) { output_line = output_line + " ]"; }
+ return output_line;
+ };
+ expected_output = "[ One two, buckle my shoe\n"
+ "Three four, knock at the door\n"
+ "Five six, picking up sticks\n ]";
+ }
+
+ LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+ line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+ std::string output_data(reinterpret_cast<const char*>(output_stream->getBuffer()), output_stream->size());
+ CHECK(output_data == expected_output);
+}
+
+TEST_CASE("LineByLineInputOutputStreamCallback can handle Windows line endings", "[process][Windows]") {
+ const auto input_data = "One two, buckle my shoe\r\n"
+ "Three four, knock at the door\r\n"
+ "Five six, picking up sticks\r\n";
+ const auto input_stream = std::make_shared<minifi::io::BufferStream>(input_data);
+ const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+
+ const auto line_processor = [](const std::string& input_line, bool, bool) {
+ static int line_number = 0;
+ return fmt::format("{0}: {1}", ++line_number, input_line);
+ };
+ const auto expected_output = "1: One two, buckle my shoe\r\n"
+ "2: Three four, knock at the door\r\n"
+ "3: Five six, picking up sticks\r\n";
+
+ LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+ line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+ std::string output_data(reinterpret_cast<const char*>(output_stream->getBuffer()), output_stream->size());
+ CHECK(output_data == expected_output);
+}
+
+TEST_CASE("LineByLineInputOutputStreamCallback can handle an empty input", "[process][empty]") {
+ const auto input_stream = std::make_shared<minifi::io::BufferStream>("");
+ const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+ const auto line_processor = [](const std::string& input_line, bool, bool) { return input_line; };
+ LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
+ line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+ CHECK(output_stream->size() == 0);
+}
diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp
index 303c21f..b2dd86a 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -31,6 +31,14 @@
using org::apache::nifi::minifi::utils::StringUtils;
+TEST_CASE("StringUtils::chomp works correctly", "[StringUtils][chomp]") {
+ using pair_of = std::pair<std::string, std::string>;
+ CHECK(StringUtils::chomp("foobar") == pair_of{"foobar", ""});
+ CHECK(StringUtils::chomp("foobar\n") == pair_of{"foobar", "\n"});
+ CHECK(StringUtils::chomp("foobar\r\n") == pair_of{"foobar", "\r\n"});
+ CHECK(StringUtils::chomp("foo\rbar\n") == pair_of{"foo\rbar", "\n"});
+}
+
TEST_CASE("TestStringUtils::split", "[test split no delimiter]") {
std::vector<std::string> expected = { "hello" };
REQUIRE(expected == StringUtils::split("hello", ","));