You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/11/15 18:38:21 UTC

[nifi-minifi-cpp] 03/04: MINIFICPP-1978 - Flush MergeContent bundles when its size would grow beyond max group size

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7888ca4312ba7cb63163b12b0e07673c29ef4a56
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Nov 9 11:45:54 2022 +0100

    MINIFICPP-1978 - Flush MergeContent bundles when its size would grow beyond max group size
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1449
---
 extensions/libarchive/BinFiles.h                |  7 ++-
 libminifi/test/archive-tests/MergeFileTests.cpp | 66 +++++++++++++++++++++++++
 2 files changed, 71 insertions(+), 2 deletions(-)

diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index 71cece114..dcc55974f 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -63,7 +63,7 @@ class Bin {
   }
   // check whether the bin meet the min required size and entries so that it can be processed for merge
   [[nodiscard]] bool isReadyForMerge() const {
-    return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
+    return closed_ || isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
   }
   // check whether the bin is older than the time specified in msec
   [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const {
@@ -87,8 +87,10 @@ class Bin {
       }
     }
 
-    if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_)
+    if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) {
+      closed_ = true;
       return false;
+    }
 
     queue_.push_back(flow);
     queued_data_size_ += flow->getSize();
@@ -119,6 +121,7 @@ class Bin {
   size_t minEntries_;
   // Queued data size
   uint64_t queued_data_size_;
+  bool closed_{false};
   // Queue for the Flow File
   std::deque<std::shared_ptr<core::FlowFile>> queue_;
   std::chrono::system_clock::time_point creation_dated_;
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index d3bb0c1c0..4152dc8de 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -822,3 +822,69 @@ TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]")
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
+
+TEST_CASE_METHOD(MergeTestController, "Maximum Group Size is respected", "[testMergeFileMaximumGroupSize]") {
+  // each flowfile content is 32 bytes
+  for (auto& ff : flowFileContents_) {
+    REQUIRE(ff.length() == 32);
+  }
+  std::string expected[2]{
+      flowFileContents_[0] + flowFileContents_[1],
+      flowFileContents_[2] + flowFileContents_[3]
+  };
+
+  context_->setProperty(minifi::processors::MergeContent::MergeFormat, minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context_->setProperty(minifi::processors::MergeContent::MergeStrategy, minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context_->setProperty(minifi::processors::MergeContent::DelimiterStrategy, minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context_->setProperty(minifi::processors::BinFiles::BatchSize, "1000");
+
+  // we want a bit more than 2 flowfiles
+  context_->setProperty(minifi::processors::BinFiles::MinSize, "65");
+  context_->setProperty(minifi::processors::BinFiles::MaxSize, "65");
+
+  context_->setProperty(minifi::processors::BinFiles::MinEntries, "3");
+  context_->setProperty(minifi::processors::BinFiles::MaxEntries, "3");
+
+  core::ProcessSession sessionGenFlowFile(context_);
+  // enqueue 6 (six) flowFiles
+  for (const int i : {0, 1, 2, 3, 4, 5}) {
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.writeBuffer(flow, flowFileContents_[i]);
+    sessionGenFlowFile.flushContent();
+    input_->put(flow);
+  }
+
+  REQUIRE(merge_content_processor_->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context_);
+  merge_content_processor_->onSchedule(context_, factory);
+  // a single trigger is enough to process all five flowFiles
+  {
+    auto session = std::make_shared<core::ProcessSession>(context_);
+    merge_content_processor_->onTrigger(context_, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output_->poll(expiredFlowRecords);
+  REQUIRE(expiredFlowRecords.empty());
+  REQUIRE(flow1);
+  {
+    FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
+    sessionGenFlowFile.read(flow1, std::ref(callback));
+    REQUIRE(callback.to_string() == expected[0]);
+  }
+
+  std::shared_ptr<core::FlowFile> flow2 = output_->poll(expiredFlowRecords);
+  REQUIRE(expiredFlowRecords.empty());
+  REQUIRE(flow2);
+  {
+    FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
+    sessionGenFlowFile.read(flow2, std::ref(callback));
+    REQUIRE(callback.to_string() == expected[1]);
+  }
+
+  // no more flowfiles
+  std::shared_ptr<core::FlowFile> flow3 = output_->poll(expiredFlowRecords);
+  REQUIRE(expiredFlowRecords.empty());
+  REQUIRE_FALSE(flow3);
+}