You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/21 11:49:34 UTC

[GitHub] [nifi-minifi-cpp] martinzink opened a new pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

martinzink opened a new pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791648848



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       The linter should complain here that anonymous namespaces should be terminated with a `  // namespace` comment. Is it not running for some reason?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797504465



##########
File path: PROCESSORS.md
##########
@@ -313,7 +313,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ### Description
 
-DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them
+DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them, it can handle multiple inputs differentiated by the <em>absolute.path</em> flow file attribute.

Review comment:
       Nitpicking, but I would split this into two sentences:
   ```suggestion
   DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them. It can handle multiple inputs differentiated by the <em>absolute.path</em> flow file attribute.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r796366608



##########
File path: extensions/standard-processors/processors/DefragmentText.h
##########
@@ -64,32 +65,48 @@ class DefragmentText : public core::Processor {
  protected:
   class Buffer {
    public:
-    bool isCompatible(const core::FlowFile& fragment) const;
     void append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append);
-    bool maxSizeReached() const;
-    bool maxAgeReached() const;
-    void setMaxAge(std::chrono::milliseconds max_age);
-    void setMaxSize(size_t max_size);
+    bool maxSizeReached(const std::optional<size_t> max_size) const;
+    bool maxAgeReached(const std::optional<std::chrono::milliseconds> max_age) const;
     void flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
                          const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
 
     bool empty() const { return buffered_flow_file_ == nullptr; }
+    std::optional<size_t> getNextFragmentOffset() const;
 
    private:
     void store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
 
     std::shared_ptr<core::FlowFile> buffered_flow_file_;
     std::chrono::steady_clock::time_point creation_time_;
-    std::optional<std::chrono::milliseconds> max_age_;
-    std::optional<size_t> max_size_;
   };
 
+  class FragmentSource {
+   public:
+    class Id {
+     public:
+      explicit Id(const core::FlowFile& flow_file);
+      struct hash {
+        size_t operator()(const Id& fragment_id) const;
+      };
+      bool operator==(const Id& rhs) const = default;
+     protected:
+      std::optional<std::string> base_name_attribute_;
+      std::optional<std::string> post_name_attribute_;
+    };
+
+    Buffer buffer_;

Review comment:
       FragmentSource could be a struct as all of it's members are public and the buffer member doesn't need to have a `_` suffix for the same reason.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797578093



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,21 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto absolute_path = flow_file.getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH))
+    absolute_path_ = *absolute_path;
+}
+
+size_t DefragmentText::FragmentSource::Id::hash::operator() (const Id& fragment_id) const {
+  return std::hash<std::optional<std::string>>{}(fragment_id.absolute_path_);
 }
 
 REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them");

Review comment:
       Sorry, just one more thing: the text of the description should be updated here, too, so the new text is available in the manifest and C2.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791648848



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       The linter should complain here that anonymous namespaces should be terminated with a `  // namespace` comment. Is it not running for some reason?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r799641338



##########
File path: extensions/standard-processors/processors/DefragmentText.h
##########
@@ -64,32 +65,48 @@ class DefragmentText : public core::Processor {
  protected:
   class Buffer {
    public:
-    bool isCompatible(const core::FlowFile& fragment) const;
     void append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append);
-    bool maxSizeReached() const;
-    bool maxAgeReached() const;
-    void setMaxAge(std::chrono::milliseconds max_age);
-    void setMaxSize(size_t max_size);
+    bool maxSizeReached(const std::optional<size_t> max_size) const;
+    bool maxAgeReached(const std::optional<std::chrono::milliseconds> max_age) const;
     void flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
                          const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
 
     bool empty() const { return buffered_flow_file_ == nullptr; }
+    std::optional<size_t> getNextFragmentOffset() const;
 
    private:
     void store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
 
     std::shared_ptr<core::FlowFile> buffered_flow_file_;
     std::chrono::steady_clock::time_point creation_time_;
-    std::optional<std::chrono::milliseconds> max_age_;
-    std::optional<size_t> max_size_;
   };
 
+  class FragmentSource {
+   public:
+    class Id {
+     public:
+      explicit Id(const core::FlowFile& flow_file);
+      struct hash {
+        size_t operator()(const Id& fragment_id) const;
+      };
+      bool operator==(const Id& rhs) const = default;
+     protected:
+      std::optional<std::string> base_name_attribute_;
+      std::optional<std::string> post_name_attribute_;
+    };
+
+    Buffer buffer_;

Review comment:
       good idea, https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/5fa9ae6f2c1b5f2be1bbcb4a745ba51251ae300c




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter.
   So I checked but it is indeed running, and if I remove the comment in the last line `}  // namespace org::apache::nifi::minifi::processors` it does complain...
   
   I also ran the linter directly on this file, and still no errors
   `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp`
   I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r792455005



##########
File path: docker/test/integration/features/defragtextflowfiles.feature
##########
@@ -2,7 +2,34 @@ Feature: DefragmentText can defragment fragmented data from TailFile
   Background:
     Given the content of "/tmp/output" is monitored
 
-  Scenario Outline: DefragmentText merges split messages from TailFile
+  Scenario Outline: DefragmentText correctly merges split messages from multiple TailFile
+    Given a TailFile processor with the name "TailOne" and the "File to Tail" property set to "/tmp/input/test_file_one.log"
+    And the "Initial Start Position" property of the TailOne processor is set to "Beginning of File"
+    And the "Input Delimiter" property of the TailOne processor is set to "%"
+    And a TailFile processor with the name "TailTwo" and the "File to Tail" property set to "/tmp/input/test_file_two.log"
+    And the "Initial Start Position" property of the TailTwo processor is set to "Beginning of File"
+    And the "Input Delimiter" property of the TailTwo processor is set to "%"
+    And "TailTwo" processor is a start node

Review comment:
       Instead of (or in addition to) this test, a more typical use case would be a single TailFile processor with `tail-mode` = `Multiple file`, `tail-base-directory` = `/tmp/input` and `File to Tail` = `test_file_.*\.log`.

##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;

Review comment:
       This won't work in the Kubernetes use case, where all log files are called `0.log` (or `1.log` etc after restarts), and the pod name etc are contained in the path.  For example:
   ```
   FlowFile Attributes Map Content
   key:TextFragmentAttribute.base_name value:0
   key:TextFragmentAttribute.offset value:3431278
   key:TextFragmentAttribute.post_name value:log
   key:absolute.path value:/var/log/pods/default_counter_dd5befc8-5573-40c3-a136-8daf6eb77b01/count/0.log
   key:filename value:0.3431278-3431357.log
   key:flow.id value:cbd22e73-f01b-43ee-aa73-a28963dc1d56
   key:path value:/var/log/pods/default_counter_dd5befc8-5573-40c3-a136-8daf6eb77b01/count
   ```
   I think `absolute.path` would be a good choice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter.
   So I checked but it is indeed running, and if I remove the comment in the last line `}  // namespace org::apache::nifi::minifi::processors` it does complain...
   
   I also ran the linter directly on this file, and still no errors
   `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp`
   I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation

##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter(although I'm not sure if that the linter checks that).
   So I checked but it is indeed running, and if I remove the comment in the last line `}  // namespace org::apache::nifi::minifi::processors` it does complain...
   
   I also ran the linter directly on this file, and still no errors
   `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp`
   I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797474209



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;

Review comment:
       didn't think of that, but you are right `absolute.path` is a good way to differentiate between inputs
   changed this in https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/098620837edc75bba8ca2494346b889d0c881837




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r791826948



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,34 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto base_name_attribute = flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE))
+    base_name_attribute_ = *base_name_attribute;
+  if (auto post_name_attribute = flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE))
+    post_name_attribute_ = *post_name_attribute;
+}
+
+namespace {
+template <typename T, typename... Rest>
+void hash_combine(size_t& seed, const T& v, Rest... rest) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (hash_combine(seed, rest), ...);
+}
+}

Review comment:
       you are right and I even found invalid indents in the file (e.g. line 325) which should also set off the linter(although I'm not sure if that the linter checks that).
   So I checked but it is indeed running, and if I remove the comment in the last line `}  // namespace org::apache::nifi::minifi::processors` it does complain...
   
   I also ran the linter directly on this file, and still no errors
   `python ../thirdparty/google-styleguide/cpplint.py --linelength=200 ../extensions/standard-processors/processors/DefragmentText.cpp`
   I even tried it with the up-to-date version from https://github.com/cpplint/cpplint/blob/develop/cpplint.py but it still doesnt catch this style violation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797473068



##########
File path: docker/test/integration/features/defragtextflowfiles.feature
##########
@@ -2,7 +2,34 @@ Feature: DefragmentText can defragment fragmented data from TailFile
   Background:
     Given the content of "/tmp/output" is monitored
 
-  Scenario Outline: DefragmentText merges split messages from TailFile
+  Scenario Outline: DefragmentText correctly merges split messages from multiple TailFile
+    Given a TailFile processor with the name "TailOne" and the "File to Tail" property set to "/tmp/input/test_file_one.log"
+    And the "Initial Start Position" property of the TailOne processor is set to "Beginning of File"
+    And the "Input Delimiter" property of the TailOne processor is set to "%"
+    And a TailFile processor with the name "TailTwo" and the "File to Tail" property set to "/tmp/input/test_file_two.log"
+    And the "Initial Start Position" property of the TailTwo processor is set to "Beginning of File"
+    And the "Input Delimiter" property of the TailTwo processor is set to "%"
+    And "TailTwo" processor is a start node

Review comment:
       good idea, I've added that case as well in https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/098620837edc75bba8ca2494346b889d0c881837




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] martinzink commented on a change in pull request #1248: MINIFICPP-1702: DefragmentText multiinput improvement

Posted by GitBox <gi...@apache.org>.
martinzink commented on a change in pull request #1248:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1248#discussion_r797743675



##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -297,29 +297,21 @@ void DefragmentText::Buffer::store(core::ProcessSession* session, const std::sha
   }
 }
 
-bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
+std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
   if (empty())
-    return true;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
-      != fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
-    return false;
-  }
-  std::string current_offset_str, append_offset_str;
-  if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
-      != fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
-    return false;
-  }
-  if (!current_offset_str.empty() && !append_offset_str.empty()) {
-    size_t current_offset = std::stoi(current_offset_str);
-    size_t append_offset = std::stoi(append_offset_str);
-    if (current_offset + buffered_flow_file_->getSize() != append_offset)
-      return false;
-  }
-  return true;
+    return std::nullopt;
+  if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
+    return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
+  return std::nullopt;
+}
+
+DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
+  if (auto absolute_path = flow_file.getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH))
+    absolute_path_ = *absolute_path;
+}
+
+size_t DefragmentText::FragmentSource::Id::hash::operator() (const Id& fragment_id) const {
+  return std::hash<std::optional<std::string>>{}(fragment_id.absolute_path_);
 }
 
 REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them");

Review comment:
       https://github.com/apache/nifi-minifi-cpp/pull/1248/commits/beaab09f917fa18929887fd88252fc72d453f62e




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org