You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/09/13 10:03:32 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1168: MINIFICPP-1632 - Implement RouteText processor

fgerlits commented on a change in pull request #1168:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1168#discussion_r706265468



##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration

Review comment:
       this is why I don't include these lines in new files :)
   
   they're not useful, it's easy to forget to update them, and the Apache docs don't have them, either: https://www.apache.org/legal/src-headers.html#headers

##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RouteText.h"
+
+#include <map>
+#include <vector>
+#include <utility>
+#include <algorithm>
+#include <set>
+
+#ifdef __APPLE__
+#include <experimental/functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>;
+#else
+#include <functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>;
+#endif
+
+#include "logging/LoggerConfiguration.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/tail.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property RouteText::RoutingStrategy(
+    core::PropertyBuilder::createProperty("Routing Strategy")
+    ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments "
+                      "of incoming text against the 'Matching Strategy' and user-defined properties. "
+                      "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). "
+                      "'Route On All' routes to 'matched' iff all dynamic relationships match. "
+                      "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Routing::DYNAMIC))
+    ->withAllowableValues<std::string>(Routing::values())
+    ->build());
+
+const core::Property RouteText::MatchingStrategy(
+    core::PropertyBuilder::createProperty("Matching Strategy")
+    ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties.")
+    ->isRequired(true)
+    ->withAllowableValues<std::string>(Matching::values())
+    ->build());
+
+const core::Property RouteText::TrimWhitespace(
+    core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace")
+    ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Property RouteText::IgnoreCase(
+    core::PropertyBuilder::createProperty("Ignore Case")
+    ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+                      "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Property RouteText::GroupingRegex(
+    core::PropertyBuilder::createProperty("Grouping Regular Expression")
+    ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. "
+                      "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups "
+                      "exist in the Regular Expression, the values from all Capturing Groups will be concatenated together. Two segments will not be "
+                      "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). "
+                      "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). "
+                      "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.")
+    ->build());
+
+const core::Property RouteText::GroupingFallbackValue(
+    core::PropertyBuilder::createProperty("Grouping Fallback Value")
+    ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.")
+    ->withDefaultValue<std::string>("")
+    ->build());
+
+const core::Property RouteText::SegmentationStrategy(
+    core::PropertyBuilder::createProperty("Segmentation Strategy")
+    ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE))
+    ->withAllowableValues<std::string>(Segmentation::values())
+    ->build());
+
+const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination");
+
+const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship");
+
+const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship");
+
+RouteText::RouteText(const std::string& name, const utils::Identifier& uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<RouteText>::getLogger()) {}
+
+void RouteText::initialize() {
+  setSupportedProperties({
+     RoutingStrategy,
+     MatchingStrategy,
+     TrimWhitespace,
+     IgnoreCase,
+     GroupingRegex,
+     GroupingFallbackValue,
+     SegmentationStrategy
+  });
+  setSupportedRelationships({Original, Unmatched, Matched});
+}
+
+static std::regex to_regex(const std::string& str) {
+  return std::regex(str);
+}
+
+void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) {
+  routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy);
+  matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy);
+  context->getProperty(TrimWhitespace.getName(), trim_);
+  case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE;
+  group_regex_ = context->getProperty(GroupingRegex) | utils::map(to_regex);
+  segmentation_ = utils::parseEnumProperty<Segmentation>(*context, SegmentationStrategy);
+  context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
+}
+
+class RouteText::ReadCallback : public InputStreamCallback {
+  using Fn = std::function<void(Segment)>;
+
+ public:
+  explicit ReadCallback(Segmentation segmentation, Fn&& fn) : segmentation_(segmentation), fn_(std::move(fn)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::vector<uint8_t> buffer;
+    std::string_view content;
+    if (auto opt_content = stream->tryGetBuffer()) {
+      content = std::string_view{reinterpret_cast<const char*>(opt_content.value()), stream->size()};
+    } else {
+      // no O(1) content access, read it into our local buffer
+      size_t total_read = 0;
+      size_t remaining = stream->size();
+      buffer.resize(remaining);
+      while (remaining != 0) {
+        size_t ret = stream->read(buffer.data() + total_read, remaining);
+        if (io::isError(ret)) return -1;
+        if (ret == 0) break;
+        remaining -= ret;
+        total_read += ret;
+      }
+      buffer.resize(total_read);
+      content = std::string_view{reinterpret_cast<const char*>(buffer.data()), buffer.size()};
+    }
+    switch (segmentation_.value()) {
+      case Segmentation::FULL_TEXT: {
+        fn_({content, 0});
+        return content.length();
+      }
+      case Segmentation::PER_LINE: {
+        // 1-based index as in nifi
+        size_t segment_idx = 1;
+        // do not strip \n\r characters before invocation to be
+        // in-line with the nifi semantics
+        // '\r' is only considered a line terminator if a non-'\n' char follows
+        std::string_view::size_type curr = 0;
+        while (curr < content.length()) {
+          // find beginning of next line
+          std::string_view::size_type next_line = std::string_view::npos;
+          if (auto next_marker = content.find_first_of("\r\n", curr); next_marker != std::string_view::npos) {
+            if (content[next_marker] == '\n') {
+              next_line = next_marker + 1;
+            } else if (next_marker + 1 < content.size()) {
+              if (content[next_marker + 1] == '\n') {
+                next_line = next_marker + 2;
+              } else {
+                next_line = next_marker + 1;
+              }
+            }
+          }
+
+          if (next_line == std::string_view::npos) {
+            fn_({content.substr(curr), segment_idx});
+          } else {
+            fn_({content.substr(curr, next_line - curr), segment_idx});
+          }
+          curr = next_line;
+          ++segment_idx;
+        }
+        return content.length();
+      }
+    }
+    throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy");
+  }
+
+ private:
+  Segmentation segmentation_;
+  Fn fn_;
+};
+
+class RouteText::MatchingContext {
+  struct CaseAwareHash {
+    explicit CaseAwareHash(CasePolicy policy): policy_(policy) {}
+    size_t operator()(char ch) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return static_cast<size_t>(ch);
+      }
+      return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch)));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+
+  struct CaseAwareEq {
+    explicit CaseAwareEq(CasePolicy policy): policy_(policy) {}
+    bool operator()(char a, char b) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return a == b;
+      }
+      return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+  using Searcher = boyer_moore_searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq>;
+
+ public:
+  MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, CasePolicy case_policy)
+    : process_context_(process_context),
+      flow_file_(std::move(flow_file)),
+      case_policy_(case_policy) {}
+
+  const std::regex& getRegexProperty(const core::Property& prop) {
+    auto it = regex_values_.find(prop.getName());
+    if (it != regex_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+    std::regex::flag_type flags = std::regex::ECMAScript;
+    if (case_policy_ == CasePolicy::IGNORE_CASE) {
+      flags |= std::regex::icase;
+    }
+    return (regex_values_[prop.getName()] = std::regex(value, flags));
+  }
+
+  const std::string& getStringProperty(const core::Property& prop) {
+    auto it = string_values_.find(prop.getName());
+    if (it != string_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+    return (string_values_[prop.getName()] = value);
+  }
+
+  const Searcher& getSearcher(const core::Property& prop) {
+    auto it = searcher_values_.find(prop.getName());
+    if (it != searcher_values_.end()) {
+      return it->second.searcher_;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+
+    return searcher_values_.emplace(
+        std::piecewise_construct, std::forward_as_tuple(prop.getName()),
+        std::forward_as_tuple(value, case_policy_)).first->second.searcher_;
+  }
+
+  core::ProcessContext& process_context_;
+  std::shared_ptr<core::FlowFile> flow_file_;
+  CasePolicy case_policy_;
+
+  std::map<std::string, std::string> string_values_;
+  std::map<std::string, std::regex> regex_values_;
+
+  struct OwningSearcher {
+    explicit OwningSearcher(std::string str, CasePolicy case_policy)
+      : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {}
+    OwningSearcher(const OwningSearcher&) = delete;
+    OwningSearcher(OwningSearcher&&) = delete;
+    OwningSearcher& operator=(const OwningSearcher&) = delete;
+    OwningSearcher& operator=(OwningSearcher&&) = delete;
+
+    std::string str_;
+    Searcher searcher_;
+  };
+
+  std::map<std::string, OwningSearcher> searcher_values_;
+};
+
+void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  using GroupName = std::string;
+  std::map<std::pair<core::Relationship, std::optional<GroupName>>, std::string> flow_file_contents;
+
+  MatchingContext matching_context(*context, flow_file, case_policy_);
+
+  ReadCallback callback(segmentation_, [&] (Segment segment) {
+    std::string_view original_value = segment.value_;
+    std::string_view preprocessed_value = preprocess(segment.value_);
+
+    if (matching_ != Matching::EXPRESSION) {
+      // an Expression has access to the raw segment like in nifi
+      // all others use the preprocessed_value
+      segment.value_ = preprocessed_value;
+    }
+
+    // group extraction always uses the preprocessed
+    auto group = getGroup(preprocessed_value);
+    switch (routing_.value()) {
+      case Routing::ALL: {
+        for (const auto& prop : dynamic_properties_) {
+          if (!matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{Unmatched, group}] += original_value;
+            return;
+          }
+        }
+        flow_file_contents[{Matched, group}] += original_value;
+        return;

Review comment:
       I'm not sure if the ranges version of `all_of` is available already, but even without it, I think
   ```suggestion
           if (std::all_of(dynamic_properties_.begin(), dynamic_properties_.end(), [&](const auto& prop) {
               return matchSegment(matching_context, segment, prop.second);
             })) {
             flow_file_contents[{Matched, group}] += original_value;
           } else {
             flow_file_contents[{Unmatched, group}] += original_value;
           }
           return;
   ```
   would be nicer

##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RouteText.h"
+
+#include <map>
+#include <vector>
+#include <utility>
+#include <algorithm>
+#include <set>
+
+#ifdef __APPLE__
+#include <experimental/functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>;
+#else
+#include <functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>;
+#endif
+
+#include "logging/LoggerConfiguration.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/tail.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property RouteText::RoutingStrategy(
+    core::PropertyBuilder::createProperty("Routing Strategy")
+    ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments "
+                      "of incoming text against the 'Matching Strategy' and user-defined properties. "
+                      "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). "
+                      "'Route On All' routes to 'matched' iff all dynamic relationships match. "
+                      "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Routing::DYNAMIC))
+    ->withAllowableValues<std::string>(Routing::values())
+    ->build());
+
+const core::Property RouteText::MatchingStrategy(
+    core::PropertyBuilder::createProperty("Matching Strategy")
+    ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties.")
+    ->isRequired(true)
+    ->withAllowableValues<std::string>(Matching::values())
+    ->build());
+
+const core::Property RouteText::TrimWhitespace(
+    core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace")
+    ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Property RouteText::IgnoreCase(
+    core::PropertyBuilder::createProperty("Ignore Case")
+    ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+                      "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Property RouteText::GroupingRegex(
+    core::PropertyBuilder::createProperty("Grouping Regular Expression")
+    ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. "
+                      "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups "
+                      "exist in the Regular Expression, the values from all Capturing Groups will be concatenated together. Two segments will not be "
+                      "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). "
+                      "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). "
+                      "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.")
+    ->build());
+
+const core::Property RouteText::GroupingFallbackValue(
+    core::PropertyBuilder::createProperty("Grouping Fallback Value")
+    ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.")
+    ->withDefaultValue<std::string>("")
+    ->build());
+
+const core::Property RouteText::SegmentationStrategy(
+    core::PropertyBuilder::createProperty("Segmentation Strategy")
+    ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE))
+    ->withAllowableValues<std::string>(Segmentation::values())
+    ->build());
+
+const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination");
+
+const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship");
+
+const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship");
+
+RouteText::RouteText(const std::string& name, const utils::Identifier& uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<RouteText>::getLogger()) {}
+
+void RouteText::initialize() {
+  setSupportedProperties({
+     RoutingStrategy,
+     MatchingStrategy,
+     TrimWhitespace,
+     IgnoreCase,
+     GroupingRegex,
+     GroupingFallbackValue,
+     SegmentationStrategy
+  });
+  setSupportedRelationships({Original, Unmatched, Matched});
+}
+
+static std::regex to_regex(const std::string& str) {
+  return std::regex(str);
+}
+
+void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) {
+  routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy);
+  matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy);
+  context->getProperty(TrimWhitespace.getName(), trim_);
+  case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE;
+  group_regex_ = context->getProperty(GroupingRegex) | utils::map(to_regex);
+  segmentation_ = utils::parseEnumProperty<Segmentation>(*context, SegmentationStrategy);
+  context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
+}
+
+class RouteText::ReadCallback : public InputStreamCallback {
+  using Fn = std::function<void(Segment)>;
+
+ public:
+  explicit ReadCallback(Segmentation segmentation, Fn&& fn) : segmentation_(segmentation), fn_(std::move(fn)) {}

Review comment:
       two-parameter constructors (also `OwningSearcher`) don't need to be explicit

##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RouteText.h"
+
+#include <map>
+#include <vector>
+#include <utility>
+#include <algorithm>
+#include <set>
+
+#ifdef __APPLE__
+#include <experimental/functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>;
+#else
+#include <functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>;
+#endif
+
+#include "logging/LoggerConfiguration.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/tail.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property RouteText::RoutingStrategy(
+    core::PropertyBuilder::createProperty("Routing Strategy")
+    ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments "
+                      "of incoming text against the 'Matching Strategy' and user-defined properties. "
+                      "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). "
+                      "'Route On All' routes to 'matched' iff all dynamic relationships match. "
+                      "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Routing::DYNAMIC))
+    ->withAllowableValues<std::string>(Routing::values())
+    ->build());
+
+const core::Property RouteText::MatchingStrategy(
+    core::PropertyBuilder::createProperty("Matching Strategy")
+    ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties.")
+    ->isRequired(true)
+    ->withAllowableValues<std::string>(Matching::values())
+    ->build());
+
+const core::Property RouteText::TrimWhitespace(
+    core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace")
+    ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Property RouteText::IgnoreCase(
+    core::PropertyBuilder::createProperty("Ignore Case")
+    ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+                      "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Property RouteText::GroupingRegex(
+    core::PropertyBuilder::createProperty("Grouping Regular Expression")
+    ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. "
+                      "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups "
+                      "exist in the Regular Expression, the values from all Capturing Groups will be concatenated together. Two segments will not be "

Review comment:
       I'm not sure if we care, as it doesn't matter for the logic, but this is not quite true; in fact, '... exist in the Regular Expression, then the Group will be the values from all Capturing Groups joined together with ", ". ...'

##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RouteText.h"
+
+#include <map>
+#include <vector>
+#include <utility>
+#include <algorithm>
+#include <set>
+
+#ifdef __APPLE__
+#include <experimental/functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>;
+#else
+#include <functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>;
+#endif
+
+#include "logging/LoggerConfiguration.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/tail.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property RouteText::RoutingStrategy(
+    core::PropertyBuilder::createProperty("Routing Strategy")
+    ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments "
+                      "of incoming text against the 'Matching Strategy' and user-defined properties. "
+                      "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). "
+                      "'Route On All' routes to 'matched' iff all dynamic relationships match. "
+                      "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Routing::DYNAMIC))
+    ->withAllowableValues<std::string>(Routing::values())
+    ->build());
+
+const core::Property RouteText::MatchingStrategy(
+    core::PropertyBuilder::createProperty("Matching Strategy")
+    ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties.")
+    ->isRequired(true)
+    ->withAllowableValues<std::string>(Matching::values())
+    ->build());
+
+const core::Property RouteText::TrimWhitespace(
+    core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace")
+    ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Property RouteText::IgnoreCase(
+    core::PropertyBuilder::createProperty("Ignore Case")
+    ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+                      "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Property RouteText::GroupingRegex(
+    core::PropertyBuilder::createProperty("Grouping Regular Expression")
+    ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. "
+                      "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups "
+                      "exist in the Regular Expression, the values from all Capturing Groups will be concatenated together. Two segments will not be "
+                      "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). "
+                      "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). "
+                      "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.")
+    ->build());
+
+const core::Property RouteText::GroupingFallbackValue(
+    core::PropertyBuilder::createProperty("Grouping Fallback Value")
+    ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.")
+    ->withDefaultValue<std::string>("")
+    ->build());
+
+const core::Property RouteText::SegmentationStrategy(
+    core::PropertyBuilder::createProperty("Segmentation Strategy")
+    ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE))
+    ->withAllowableValues<std::string>(Segmentation::values())
+    ->build());
+
+const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination");
+
+const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship");
+
+const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship");
+
+RouteText::RouteText(const std::string& name, const utils::Identifier& uuid)
+    : core::Processor(name, uuid), logger_(logging::LoggerFactory<RouteText>::getLogger()) {}
+
+void RouteText::initialize() {
+  setSupportedProperties({
+     RoutingStrategy,
+     MatchingStrategy,
+     TrimWhitespace,
+     IgnoreCase,
+     GroupingRegex,
+     GroupingFallbackValue,
+     SegmentationStrategy
+  });
+  setSupportedRelationships({Original, Unmatched, Matched});
+}
+
+static std::regex to_regex(const std::string& str) {
+  return std::regex(str);
+}
+
+void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) {
+  routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy);
+  matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy);
+  context->getProperty(TrimWhitespace.getName(), trim_);
+  case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE;
+  group_regex_ = context->getProperty(GroupingRegex) | utils::map(to_regex);
+  segmentation_ = utils::parseEnumProperty<Segmentation>(*context, SegmentationStrategy);
+  context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
+}
+
+class RouteText::ReadCallback : public InputStreamCallback {
+  using Fn = std::function<void(Segment)>;
+
+ public:
+  explicit ReadCallback(Segmentation segmentation, Fn&& fn) : segmentation_(segmentation), fn_(std::move(fn)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::vector<uint8_t> buffer;
+    std::string_view content;
+    if (auto opt_content = stream->tryGetBuffer()) {
+      content = std::string_view{reinterpret_cast<const char*>(opt_content.value()), stream->size()};
+    } else {
+      // no O(1) content access, read it into our local buffer
+      size_t total_read = 0;
+      size_t remaining = stream->size();
+      buffer.resize(remaining);
+      while (remaining != 0) {
+        size_t ret = stream->read(buffer.data() + total_read, remaining);
+        if (io::isError(ret)) return -1;
+        if (ret == 0) break;
+        remaining -= ret;
+        total_read += ret;
+      }
+      buffer.resize(total_read);
+      content = std::string_view{reinterpret_cast<const char*>(buffer.data()), buffer.size()};
+    }
+    switch (segmentation_.value()) {
+      case Segmentation::FULL_TEXT: {
+        fn_({content, 0});
+        return content.length();
+      }
+      case Segmentation::PER_LINE: {
+        // 1-based index as in nifi
+        size_t segment_idx = 1;
+        // do not strip \n\r characters before invocation to be
+        // in-line with the nifi semantics
+        // '\r' is only considered a line terminator if a non-'\n' char follows
+        std::string_view::size_type curr = 0;
+        while (curr < content.length()) {
+          // find beginning of next line
+          std::string_view::size_type next_line = std::string_view::npos;
+          if (auto next_marker = content.find_first_of("\r\n", curr); next_marker != std::string_view::npos) {
+            if (content[next_marker] == '\n') {
+              next_line = next_marker + 1;
+            } else if (next_marker + 1 < content.size()) {
+              if (content[next_marker + 1] == '\n') {
+                next_line = next_marker + 2;
+              } else {
+                next_line = next_marker + 1;
+              }
+            }
+          }
+
+          if (next_line == std::string_view::npos) {
+            fn_({content.substr(curr), segment_idx});
+          } else {
+            fn_({content.substr(curr, next_line - curr), segment_idx});
+          }
+          curr = next_line;
+          ++segment_idx;
+        }
+        return content.length();
+      }
+    }
+    throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy");
+  }
+
+ private:
+  Segmentation segmentation_;
+  Fn fn_;
+};
+
+class RouteText::MatchingContext {
+  struct CaseAwareHash {
+    explicit CaseAwareHash(CasePolicy policy): policy_(policy) {}
+    size_t operator()(char ch) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return static_cast<size_t>(ch);
+      }
+      return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch)));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+
+  struct CaseAwareEq {
+    explicit CaseAwareEq(CasePolicy policy): policy_(policy) {}
+    bool operator()(char a, char b) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return a == b;
+      }
+      return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+  using Searcher = boyer_moore_searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq>;
+
+ public:
+  MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, CasePolicy case_policy)
+    : process_context_(process_context),
+      flow_file_(std::move(flow_file)),
+      case_policy_(case_policy) {}
+
+  const std::regex& getRegexProperty(const core::Property& prop) {
+    auto it = regex_values_.find(prop.getName());
+    if (it != regex_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+    std::regex::flag_type flags = std::regex::ECMAScript;
+    if (case_policy_ == CasePolicy::IGNORE_CASE) {
+      flags |= std::regex::icase;
+    }
+    return (regex_values_[prop.getName()] = std::regex(value, flags));
+  }
+
+  const std::string& getStringProperty(const core::Property& prop) {
+    auto it = string_values_.find(prop.getName());
+    if (it != string_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+    return (string_values_[prop.getName()] = value);
+  }
+
+  const Searcher& getSearcher(const core::Property& prop) {
+    auto it = searcher_values_.find(prop.getName());
+    if (it != searcher_values_.end()) {
+      return it->second.searcher_;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'");
+    }
+
+    return searcher_values_.emplace(
+        std::piecewise_construct, std::forward_as_tuple(prop.getName()),
+        std::forward_as_tuple(value, case_policy_)).first->second.searcher_;
+  }
+
+  core::ProcessContext& process_context_;
+  std::shared_ptr<core::FlowFile> flow_file_;
+  CasePolicy case_policy_;
+
+  std::map<std::string, std::string> string_values_;
+  std::map<std::string, std::regex> regex_values_;
+
+  struct OwningSearcher {
+    explicit OwningSearcher(std::string str, CasePolicy case_policy)
+      : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {}
+    OwningSearcher(const OwningSearcher&) = delete;
+    OwningSearcher(OwningSearcher&&) = delete;
+    OwningSearcher& operator=(const OwningSearcher&) = delete;
+    OwningSearcher& operator=(OwningSearcher&&) = delete;
+
+    std::string str_;
+    Searcher searcher_;
+  };
+
+  std::map<std::string, OwningSearcher> searcher_values_;
+};
+
+void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  using GroupName = std::string;
+  std::map<std::pair<core::Relationship, std::optional<GroupName>>, std::string> flow_file_contents;
+
+  MatchingContext matching_context(*context, flow_file, case_policy_);
+
+  ReadCallback callback(segmentation_, [&] (Segment segment) {
+    std::string_view original_value = segment.value_;
+    std::string_view preprocessed_value = preprocess(segment.value_);
+
+    if (matching_ != Matching::EXPRESSION) {
+      // an Expression has access to the raw segment like in nifi
+      // all others use the preprocessed_value
+      segment.value_ = preprocessed_value;
+    }
+
+    // group extraction always uses the preprocessed
+    auto group = getGroup(preprocessed_value);
+    switch (routing_.value()) {
+      case Routing::ALL: {
+        for (const auto& prop : dynamic_properties_) {
+          if (!matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{Unmatched, group}] += original_value;
+            return;
+          }
+        }
+        flow_file_contents[{Matched, group}] += original_value;
+        return;
+      }
+      case Routing::ANY: {
+        for (const auto& prop : dynamic_properties_) {
+          if (matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{Matched, group}] += original_value;
+            return;
+          }
+        }
+        flow_file_contents[{Unmatched, group}] += original_value;
+        return;
+      }
+      case Routing::DYNAMIC: {
+        bool routed = false;
+        for (const auto& prop : dynamic_properties_) {
+          if (matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{dynamic_relationships_[prop.first], group}] += original_value;
+            routed = true;
+          }
+        }
+        if (!routed) {
+          flow_file_contents[{Unmatched, group}] += original_value;
+        }
+        return;
+      }
+    }
+    throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
+  });
+  session->read(flow_file, &callback);
+
+  for (const auto& flow_file_content : flow_file_contents) {

Review comment:
       this could be written this way, to avoid things like `first.first` and `first.second`:
   ```c++
     for (const auto& [key, content] : flow_file_contents) {
       const auto& [relationship, group] = key;
       ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org