You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/09/08 14:50:41 UTC

[GitHub] [nifi-minifi-cpp] fgerlits opened a new pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

fgerlits opened a new pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170


   Processor based on https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.14.0/org.apache.nifi.processors.standard.ReplaceText/index.html
   
   ---
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [x] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       unfortunately it seems nifi has a different concept of "line", it  uses the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line (or beginning of string)
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r717317315



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       I think pretty patterns are useful, too, as they improve readability.  Also, I suspect it won't take long before cpplint and clang-tidy disagree about something and then we'll be stuck.  But okay: https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4b1476c18104f4fa9de3994c9b366c284177e779.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705139497



##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -104,7 +104,7 @@ def generate_input_port_for_remote_process_group(remote_process_group, name):
         return input_port_node
 
     def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
-        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
+        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.replace('\\n', '\n').encode('utf-8'))

Review comment:
       note: I have also touched this part for the same reason (use escaped `\t`, `\n`, ... chars) in [1168](https://github.com/apache/nifi-minifi-cpp/pull/1168) (same with the file content validation)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705172435



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       Just speculating, but that may have been done in NiFi to deal with the old MacOS line endings of bare `\r`.  Since these line endings are not used by any computer in use today, I think using `\n`, which covers Windows, Linux and MacOS X+, is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705159596



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size());

Review comment:
       it seems we might process longer than `buffer size` lines, as we might append the end of the previous buffer fill




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r717382168



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       I don't want to force my opinion on you, so feel free to revert the change and close this thread if I didn't convince you. I think minor issues like this are well within the author's discretion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714759611



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }

Review comment:
       That would complicate the code considerably, as we would need to read the Search Value either in `onSchedule` or in `onTrigger`, depending on whether we are using Regex Replace or not.
   
   I have run this in Google Benchmark:
   ```c++
   #include <benchmark/benchmark.h>
   #include <regex>
   #include <string>
   
   static void SimpleRegex(benchmark::State& state) {
     std::string input = ".*foo?";
     for (auto _ : state) {
       std::regex my_regex{input};
       benchmark::DoNotOptimize(my_regex);
     }
   }
   BENCHMARK(SimpleRegex);
   
   static void ComplicatedRegex(benchmark::State& state) {
     std::string input = "(.)[ompli]{5}\\1[ae]t[ae]d";
     for (auto _ : state) {
       std::regex my_regex{input};
       benchmark::DoNotOptimize(my_regex);
     }
   }
   BENCHMARK(ComplicatedRegex);
   
   BENCHMARK_MAIN();
   ```
   and the simple regex took 0.6 microseconds to compile, the complicated one took 77 microseconds (on gcc 11.1 with `-O3`).  I think that is an acceptable price for simpler code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716525894



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(CallbackType callback)
+  : callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  if (int64_t status = readInput(*input); status <= 0) {
+    return status;
+  }
+
+  std::size_t total_bytes_written_ = 0;
+  bool is_first_line = true;
+  readLine();
+  do {
+    readLine();
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());

Review comment:
       should we check the return value of `write` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r717317315



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       I think pretty patterns are useful, too, as they improve readability.  Also, I suspect it won't take long before cpplint and clang-tidy disagree about something and then we'll be stuck.  But okay: https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4b1476c18104f4fa9de3994c9b366c284177e779.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716645858



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       `applyLiteralReplace` could be `static`, or a free utility function in an anonymous namespace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705980315



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), search_value_);

Review comment:
       won't writing members (`search_value_`, `search_regex_`) interfere with other threads of this same processor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r706076725



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      search_regex_ = std::regex{search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+}
+
+void ReplaceText::readReplacementValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  if (flow_file->getSize() > maximum_buffer_size_) {
+    logger_->log_error("Flow file size %" PRIu64 " is larger than %s = %" PRIu64 " so transferring it to Failure",
+                       flow_file->getSize(), MaximumBufferSize.getName(), maximum_buffer_size_);
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{maximum_buffer_size_, logger_,
+        [this, &flow_file](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return input + replacement_value_;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return applyRegexReplace(input);
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(input);
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return replacement_value_;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(input, flow_file);
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyRegexReplace(const std::string& input) const {
+  const auto [chomped_line, line_ending] = utils::StringUtils::chomp(input);
+  std::string output = std::regex_replace(chomped_line, search_regex_, replacement_value_);
+  return output + line_ending;
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input) const {
+  std::vector<char> output;
+  output.reserve(input.size());
+
+  auto it = input.begin();
+  do {
+    auto found = std::search(it, input.end(), search_value_.begin(), search_value_.end());
+    if (found != input.end()) {
+      std::copy(it, found, std::back_inserter(output));
+      std::copy(replacement_value_.begin(), replacement_value_.end(), std::back_inserter(output));
+      it = found;
+      std::advance(it, search_value_.size());
+    } else {
+      std::copy(it, input.end(), std::back_inserter(output));
+      it = input.end();
+    }
+  } while (it != input.end());
+
+  return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  static const std::regex PLACEHOLDER{R"(\$\{([^}]+)\})"};

Review comment:
       That was my original plan, too, but the expression language substitution logic is hidden inside the expression-language extension, and I didn't think exposing it should be part of this PR.  Also, full expression language support was not a requirement.  We could do this later in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r712776682



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;

Review comment:
       yup, makes sense
   
   done in https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/fead017bb2845854247d04c64d51882b273aba5a




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);

Review comment:
       note: the way it is implemented in RouteText is to check if the input stream supports access to its underlying buffer (BufferStream and the default RocksDbStream do) and only read into a buffer as a fallback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705173760



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size());

Review comment:
       I guess I was just making an observation aloud :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716532658



##########
File path: libminifi/include/utils/StringUtils.h
##########
@@ -75,7 +75,12 @@ class StringUtils {
 
   static std::string toLower(std::string_view str);
 
-  // Trim String utils
+  /**
+   * Strips the line ending (\n or \r\n) from the end of the input line.
+   * @param input_line
+   * @return (stripped line, line ending) pair
+   */
+  static std::pair<std::string, std::string> chomp(const std::string& input_line);

Review comment:
       we seem to migrate our StringUtils to operate on `std::string_view` wherever possible, we might benefit from writing the new ones on the same note




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705200372



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       (3) is handled correctly (the `\n`-less last line is treated as a line, is processed and output without a `\n`), **except** in the edge case where the last line doesn't end in a `\n` **and** it straddles a buffer boundary, which causes an error (flow file routed to failure).  Thanks for pointing this out, I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r712829713



##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};

Review comment:
       Instead of following the same flawed pattern of the past, I prefer doing things the C++ way, which in this case would be passing function objects directly (potentially type-erased using std::function) instead of making them have to inherit *Callback.
   LineByLineInputOutputStreamCallback could still exist, but `process` would need to become `operator()` and it wouldn't need to be a subclass.

##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    const auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = chomped_value;

Review comment:
       Consider moving this string to avoid an extra allocation.
   ```suggestion
       auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
       parameters.replacement_value_ = std::move(chomped_value);
   ```

##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }

Review comment:
       Can we do anything to avoid compiling a regex on each flow file? I didn't measure but I feel like it might be a significant performance bottleneck.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705166242



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = start_of_line + std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  logger_->log_error("Found a line longer than the max buffer size (%zu bytes)", buffer_.size());

Review comment:
       That's correct, lines up to 2 times the buffer size may be accepted, but only lines <= the buffer size are guaranteed to be accepted.  Do you think that is a problem?  Or do you think we need to make the error message more verbose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714761876



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }

Review comment:
       That's fair, I expected them to be in the magnitude of tens of milliseconds. Thanks for doing the measurement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r709315815



##########
File path: extensions/standard-processors/processors/ReplaceText.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  static const core::Property EvaluationMode;
+  static const core::Property LineByLineEvaluationMode;
+  static const core::Property ReplacementStrategy;
+  static const core::Property MaximumBufferSize;
+  static const core::Property SearchValue;
+  static const core::Property ReplacementValue;
+
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {});
+  core::annotation::Input getInputRequirement() const override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  void readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+  void readReplacementValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  void replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+  void replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+
+  std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string applyRegexReplace(const std::string& input) const;
+  std::string applyLiteralReplace(const std::string& input) const;
+  std::string applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const;
+
+  std::shared_ptr<logging::Logger> logger_;
+  std::string search_value_;
+  std::regex search_regex_;
+  std::string replacement_value_;
+  uint64_t maximum_buffer_size_ = 0;
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE;
+};
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies.");

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705279874



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       the bug in (3) is fixed by https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4df70bdec67d7f91182e10b01e6df646acf5e85a




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705156210



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);

Review comment:
       the way it is implemented in RouteText is to check if the input stream supports access to its underlying buffer (BufferStream and the default RocksDbStream do) and only read into a buffer as a fallback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r712748416



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;

Review comment:
       all other `getInputRequirement` overrides are implemented in the headers, which I kind of agree with, as its implementation (the return value) is something inherent to the processor class (like the relationships and properties)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705990164



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      search_regex_ = std::regex{search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+}
+
+void ReplaceText::readReplacementValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  if (flow_file->getSize() > maximum_buffer_size_) {
+    logger_->log_error("Flow file size %" PRIu64 " is larger than %s = %" PRIu64 " so transferring it to Failure",
+                       flow_file->getSize(), MaximumBufferSize.getName(), maximum_buffer_size_);
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{maximum_buffer_size_, logger_,
+        [this, &flow_file](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return input + replacement_value_;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return applyRegexReplace(input);
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(input);
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return replacement_value_;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(input, flow_file);
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyRegexReplace(const std::string& input) const {
+  const auto [chomped_line, line_ending] = utils::StringUtils::chomp(input);
+  std::string output = std::regex_replace(chomped_line, search_regex_, replacement_value_);
+  return output + line_ending;
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input) const {
+  std::vector<char> output;
+  output.reserve(input.size());
+
+  auto it = input.begin();
+  do {
+    auto found = std::search(it, input.end(), search_value_.begin(), search_value_.end());
+    if (found != input.end()) {
+      std::copy(it, found, std::back_inserter(output));
+      std::copy(replacement_value_.begin(), replacement_value_.end(), std::back_inserter(output));
+      it = found;
+      std::advance(it, search_value_.size());
+    } else {
+      std::copy(it, input.end(), std::back_inserter(output));
+      it = input.end();
+    }
+  } while (it != input.end());
+
+  return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  static const std::regex PLACEHOLDER{R"(\$\{([^}]+)\})"};

Review comment:
       I see that nifi also uses a hand-rolled variable substitution here, but the attribute substitution already happened when we queried the property and the ExpressionLanguage is enabled, right? it seems to me that standard processors have the EL extension as dependency, so we might be able to rely on that here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716532658



##########
File path: libminifi/include/utils/StringUtils.h
##########
@@ -75,7 +75,12 @@ class StringUtils {
 
   static std::string toLower(std::string_view str);
 
-  // Trim String utils
+  /**
+   * Strips the line ending (\n or \r\n) from the end of the input line.
+   * @param input_line
+   * @return (stripped line, line ending) pair
+   */
+  static std::pair<std::string, std::string> chomp(const std::string& input_line);

Review comment:
       we seem to (slowly) migrate our StringUtils to operate on `std::string_view` wherever possible, we might benefit from writing the new ones on the same note




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716782182



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       Yes it could, but should it?  We would be marking this function as different from those around it, when the difference is only "this function doesn't currently log anything", which is a minor implementation detail.  The only advantage I can see of adding "static" would be to get rid of the clang-tidy warning, but I don't think we should let clang-tidy push us around like that. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r739078055



##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -104,7 +104,7 @@ def generate_input_port_for_remote_process_group(remote_process_group, name):
         return input_port_node
 
     def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
-        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
+        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.replace('\\n', '\n').encode('utf-8'))

Review comment:
       I have separated this change to the first commit (1b5937003b4592c119e58ed3d73931b0692517b9), so to merge this PR after #1168, whoever is merging should just need to skip 1b5937003b4592c119e58ed3d73931b0692517b9 when cherry-picking commits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       unfortunately it seems nifi has a different concept of "line", it  uses the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line (or beginning of line)
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163626



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);

Review comment:
       thanks, I'll take a look




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714758131



##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};

Review comment:
       Yes, I meant that, except that I would pass the std::function by value. It would be better because in C++, we usually pass functions as simple callable objects, as opposed to the java way of implementing an interface with only one public method. The *StreamCallback classes are useless in C++ and only serve the purpose of imitating the Java way of passing functions in C++. We should be able to pass a lambda or any other normal function object directly to functions that expect another function as parameter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714799871



##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};

Review comment:
       As discussed, let's keep it as it is for now, and possibly change all three (read, write, readWrite) functions together in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r717382168



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       I don't want to force my opinion on you, so feel free to revert the change and close this thread if I didn't convince you. I think minor issues like this are well within the author's discretion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716619048



##########
File path: libminifi/include/utils/StringUtils.h
##########
@@ -75,7 +75,12 @@ class StringUtils {
 
   static std::string toLower(std::string_view str);
 
-  // Trim String utils
+  /**
+   * Strips the line ending (\n or \r\n) from the end of the input line.
+   * @param input_line
+   * @return (stripped line, line ending) pair
+   */
+  static std::pair<std::string, std::string> chomp(const std::string& input_line);

Review comment:
       That would require modifying `endsWith()` (and, for consistency, `endsWithIgnoreCase()` and `startsWith()`), which would introduce more conflicts with #1168.  If #1168 is merged before this, and it includes the change to `endsWith()`, then I will change `chomp()` similarly.
   
   EDIT: I'm not sure we want to do this.  As you pointed out elsewhere, we'd still need to keep the version taking a `std::string` argument to deal with temporary strings.  So we'll end up with 3 overloads (to remove the ambiguity when called with a `const char*`), and in most cases I'll need to convert the result to a string after calling `chomp()` anyway.  So this would mean quite a bit of extra code, for not much benefit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714751281



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    const auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = chomped_value;

Review comment:
       done in https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/ace5e48f1697ad97efb54693b2536bf06664917a




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r714750961



##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};

Review comment:
       Is this what you mean: https://github.com/fgerlits/nifi-minifi-cpp/commit/cdfbca12e6e2a49f93f286f411cecd91a0bce1bf?  I will do it if you explain why it would be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r712829713



##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
   virtual ~OutputStreamCallback() = default;
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
 };
+class InputOutputStreamCallback {
+ public:
+  virtual ~InputOutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
+};

Review comment:
       Instead of following the same flawed pattern of the past, I prefer doing things the C++ way, which in this case would be passing function objects directly (potentially type-erased using std::function) instead of making them have to inherit *Callback.
   `LineByLineInputOutputStreamCallback` could still exist, but `process` would need to become `operator()` and it wouldn't need to be a subclass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705141850



##########
File path: extensions/standard-processors/processors/ReplaceText.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  static const core::Property EvaluationMode;
+  static const core::Property LineByLineEvaluationMode;
+  static const core::Property ReplacementStrategy;
+  static const core::Property MaximumBufferSize;
+  static const core::Property SearchValue;
+  static const core::Property ReplacementValue;
+
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {});
+  core::annotation::Input getInputRequirement() const override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  void readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+  void readReplacementValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  void replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+  void replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+
+  std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string applyRegexReplace(const std::string& input) const;
+  std::string applyLiteralReplace(const std::string& input) const;
+  std::string applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const;
+
+  std::shared_ptr<logging::Logger> logger_;
+  std::string search_value_;
+  std::regex search_regex_;
+  std::string replacement_value_;
+  uint64_t maximum_buffer_size_ = 0;
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE;
+};
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies.");

Review comment:
       note: after the dynamic PR, resource registration should happen in cpp files




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716616708



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(CallbackType callback)
+  : callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  if (int64_t status = readInput(*input); status <= 0) {
+    return status;
+  }
+
+  std::size_t total_bytes_written_ = 0;
+  bool is_first_line = true;
+  readLine();
+  do {
+    readLine();
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());

Review comment:
       Good catch, thanks!  Fixed in https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/9a5b33d91cca7316a42b4b4c26498de622833742




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716619048



##########
File path: libminifi/include/utils/StringUtils.h
##########
@@ -75,7 +75,12 @@ class StringUtils {
 
   static std::string toLower(std::string_view str);
 
-  // Trim String utils
+  /**
+   * Strips the line ending (\n or \r\n) from the end of the input line.
+   * @param input_line
+   * @return (stripped line, line ending) pair
+   */
+  static std::pair<std::string, std::string> chomp(const std::string& input_line);

Review comment:
       That would require modifying `endsWith()` (and, for consistency, `endsWithIgnoreCase()` and `startsWith()`), which would introduce more conflicts with #1168.  If #1168 is merged before this, and it includes the change to `endsWith()`, then I will change `chomp()` similarly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r716790066



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "against the whole input treated as a single string (Entire Text).")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      EvaluationMode,
+      LineByLineEvaluationMode,
+      ReplacementStrategy,
+      SearchValue,
+      ReplacementValue
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    yield();
+    return;
+  }
+
+  Parameters parameters = readParameters(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session, parameters);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session, parameters);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const {
+  Parameters parameters;
+
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      parameters.search_regex_ = std::regex{parameters.search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")};
+  }
+
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())};
+  }
+
+  if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+    auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_);
+    parameters.replacement_value_ = std::move(chomped_value);
+  }
+
+  return parameters;
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file, parameters);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, &parameters](const std::string& input_line, bool is_first_line, bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const {
+  const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input);
+
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return parameters.replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return chomped_input + parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending;
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(chomped_input, parameters) + line_ending;
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return parameters.replacement_value_ + line_ending;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(chomped_input, flow_file) + line_ending;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const {

Review comment:
       I see this as an unnecessary dependency just for subjective aesthetics. I also don't want to block the PR because of this minor detail if you disagree.
   
   I like following clang-tidy suggestions unless I have a good argument against it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r707235058



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);

Review comment:
       Does this mean that if we use RocksDB (ie practically always), then the whole flow file is always loaded in memory?  I wanted to avoid that in line-by-line mode, but I may have been worrying about a non-existent case.
   
   Do you think there is value in keeping this implementation which supports large flow files being read one (potentially small) chunk at a time?  If not, we should probably remove the whole max buffer size property, which would make things simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705169032



##########
File path: libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(uint64_t buffer_size, std::shared_ptr<core::logging::Logger> logger, CallbackType callback)
+  : buffer_(buffer_size),
+    logger_(std::move(logger)),
+    callback_(std::move(callback)) {
+}
+
+int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+  gsl_Expects(input);
+  gsl_Expects(output);
+
+  input_size_ = input->size();
+
+  if (int64_t status = readLine(*input); status <= 0) {
+    return status;
+  }
+
+  bool is_first_line = true;
+  do {
+    if (int64_t status = readLine(*input); status < 0) {
+      return status;
+    }
+    std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
+    output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
+    is_first_line = false;
+  } while (!isLastLine());
+
+  return gsl::narrow<int64_t>(total_bytes_read_);
+}
+
+int64_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream) {
+  std::string input_line;
+  const size_t current_read = readLine(stream, input_line);
+  if (io::isError(current_read)) {
+    return -1;
+  }
+  if (current_read == 0) {
+    current_line_ = next_line_;
+    next_line_ = std::nullopt;
+    return 0;
+  }
+  current_line_ = next_line_;
+  next_line_ = input_line;
+  total_bytes_read_ += current_read;
+  return gsl::narrow<int64_t>(current_read);
+}
+
+size_t LineByLineInputOutputStreamCallback::readLine(io::InputStream& stream, std::string& line) {
+  if (current_pos_ == end_pos_) {
+    const auto current_bytes_read = fillBuffer(stream);
+    if (io::isError(current_bytes_read) || current_bytes_read == 0) {
+      return current_bytes_read;
+    }
+  }
+
+  int64_t pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {
+    ++pos;
+  }
+  if (pos < end_pos_ && buffer_[pos] == '\n') {
+    ++pos;
+    line = std::string{buffer_.begin() + current_pos_, buffer_.begin() + pos};
+    current_pos_ = pos;
+    return line.size();
+  }
+
+  std::string start_of_line{buffer_.begin() + current_pos_, buffer_.begin() + end_pos_};
+
+  const auto current_bytes_read = fillBuffer(stream);
+  if (io::isError(current_bytes_read)) {
+    return io::STREAM_ERROR;
+  } else if (current_bytes_read == 0) {
+    line = start_of_line;
+    return line.size();
+  }
+
+  pos = current_pos_;
+  while (pos < end_pos_ && buffer_[pos] != '\n') {

Review comment:
       unfortunately it seems nifi has a different concept of "line", it  uses the `LineDemarcator` which
   
   1. considers all `\n` as end-of-line
   2. considers all `\r` followed by a non-`\n` as end-of-line
   3. considers a non-empty (!) tail after the last end-of-line as a line
   
   (it's quite confusing) 
   
   this was implemented in RouteText to be in-line with nifi, but of course we could decide to change them to use a different segmentation logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r706074135



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), search_value_);

Review comment:
       Good point, `onTrigger()` was not thread-safe.  Fixed in https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/86eb3f776005301a6507ca7b27e5d28736d5923a.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705163375



##########
File path: extensions/standard-processors/processors/ReplaceText.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "core/Annotation.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+SMART_ENUM(EvaluationModeType,
+  (LINE_BY_LINE, "Line-by-Line"),
+  (ENTIRE_TEXT, "Entire text")
+)
+
+SMART_ENUM(LineByLineEvaluationModeType,
+  (ALL, "All"),
+  (FIRST_LINE, "First-Line"),
+  (LAST_LINE, "Last-Line"),
+  (EXCEPT_FIRST_LINE, "Except-First-Line"),
+  (EXCEPT_LAST_LINE, "Except-Last-Line")
+)
+
+SMART_ENUM(ReplacementStrategyType,
+  (PREPEND, "Prepend"),
+  (APPEND, "Append"),
+  (REGEX_REPLACE, "Regex Replace"),
+  (LITERAL_REPLACE, "Literal Replace"),
+  (ALWAYS_REPLACE, "Always Replace"),
+  (SUBSTITUTE_VARIABLES, "Substitute Variables")
+)
+
+class ReplaceText : public core::Processor {
+ public:
+  static const core::Property EvaluationMode;
+  static const core::Property LineByLineEvaluationMode;
+  static const core::Property ReplacementStrategy;
+  static const core::Property MaximumBufferSize;
+  static const core::Property SearchValue;
+  static const core::Property ReplacementValue;
+
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
+
+  explicit ReplaceText(const std::string& name, const utils::Identifier& uuid = {});
+  core::annotation::Input getInputRequirement() const override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+  friend struct ReplaceTextTestAccessor;
+
+  void readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+  void readReplacementValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file);
+
+  void replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+  void replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const;
+
+  std::string applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string applyRegexReplace(const std::string& input) const;
+  std::string applyLiteralReplace(const std::string& input) const;
+  std::string applySubstituteVariables(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file) const;
+  std::string getAttributeValue(const std::shared_ptr<core::FlowFile>& flow_file, const std::smatch& match) const;
+
+  std::shared_ptr<logging::Logger> logger_;
+  std::string search_value_;
+  std::regex search_regex_;
+  std::string replacement_value_;
+  uint64_t maximum_buffer_size_ = 0;
+  EvaluationModeType evaluation_mode_ = EvaluationModeType::LINE_BY_LINE;
+  LineByLineEvaluationModeType line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::ALL;
+  ReplacementStrategyType replacement_strategy_ = ReplacementStrategyType::REGEX_REPLACE;
+};
+
+REGISTER_RESOURCE(ReplaceText, "Updates the content of a FlowFile by replacing parts of it using various replacement strategies.");

Review comment:
       OK, I'll move it after #1138 is merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705162395



##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -104,7 +104,7 @@ def generate_input_port_for_remote_process_group(remote_process_group, name):
         return input_port_node
 
     def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
-        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
+        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.replace('\\n', '\n').encode('utf-8'))

Review comment:
       OK, I'll revert these bits if your PR is merged first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1170: MINIFICPP-1618 Create the ReplaceText processor

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705980315



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. "
+                      "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. "
+                                                         "This includes both FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode);
+  evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), search_value_);

Review comment:
       won't writing members (`search_value_`, `search_regex_`) interferes with other threads of this same processor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org