You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/11/15 18:38:21 UTC
[nifi-minifi-cpp] 03/04: MINIFICPP-1978 - Flush MergeContent bundles when its size would grow beyond max group size
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7888ca4312ba7cb63163b12b0e07673c29ef4a56
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Nov 9 11:45:54 2022 +0100
MINIFICPP-1978 - Flush MergeContent bundles when its size would grow beyond max group size
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1449
---
extensions/libarchive/BinFiles.h | 7 ++-
libminifi/test/archive-tests/MergeFileTests.cpp | 66 +++++++++++++++++++++++++
2 files changed, 71 insertions(+), 2 deletions(-)
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index 71cece114..dcc55974f 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -63,7 +63,7 @@ class Bin {
}
// check whether the bin meet the min required size and entries so that it can be processed for merge
[[nodiscard]] bool isReadyForMerge() const {
- return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
+ return closed_ || isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
}
// check whether the bin is older than the time specified in msec
[[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const {
@@ -87,8 +87,10 @@ class Bin {
}
}
- if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_)
+ if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) {
+ closed_ = true;
return false;
+ }
queue_.push_back(flow);
queued_data_size_ += flow->getSize();
@@ -119,6 +121,7 @@ class Bin {
size_t minEntries_;
// Queued data size
uint64_t queued_data_size_;
+ bool closed_{false};
// Queue for the Flow File
std::deque<std::shared_ptr<core::FlowFile>> queue_;
std::chrono::system_clock::time_point creation_dated_;
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index d3bb0c1c0..4152dc8de 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -822,3 +822,69 @@ TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]")
REQUIRE(callback.to_string() == expected[1]);
}
}
+
+TEST_CASE_METHOD(MergeTestController, "Maximum Group Size is respected", "[testMergeFileMaximumGroupSize]") {
+ // each flowfile content is 32 bytes
+ for (auto& ff : flowFileContents_) {
+ REQUIRE(ff.length() == 32);
+ }
+ std::string expected[2]{
+ flowFileContents_[0] + flowFileContents_[1],
+ flowFileContents_[2] + flowFileContents_[3]
+ };
+
+ context_->setProperty(minifi::processors::MergeContent::MergeFormat, minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context_->setProperty(minifi::processors::MergeContent::MergeStrategy, minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context_->setProperty(minifi::processors::MergeContent::DelimiterStrategy, minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context_->setProperty(minifi::processors::BinFiles::BatchSize, "1000");
+
+ // we want a bit more than 2 flowfiles
+ context_->setProperty(minifi::processors::BinFiles::MinSize, "65");
+ context_->setProperty(minifi::processors::BinFiles::MaxSize, "65");
+
+ context_->setProperty(minifi::processors::BinFiles::MinEntries, "3");
+ context_->setProperty(minifi::processors::BinFiles::MaxEntries, "3");
+
+ core::ProcessSession sessionGenFlowFile(context_);
+ // enqueue 6 (six) flowFiles
+ for (const int i : {0, 1, 2, 3, 4, 5}) {
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.writeBuffer(flow, flowFileContents_[i]);
+ sessionGenFlowFile.flushContent();
+ input_->put(flow);
+ }
+
+ REQUIRE(merge_content_processor_->getName() == "mergecontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context_);
+ merge_content_processor_->onSchedule(context_, factory);
+ // a single trigger is enough to process all five flowFiles
+ {
+ auto session = std::make_shared<core::ProcessSession>(context_);
+ merge_content_processor_->onTrigger(context_, session);
+ session->commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output_->poll(expiredFlowRecords);
+ REQUIRE(expiredFlowRecords.empty());
+ REQUIRE(flow1);
+ {
+ FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
+ sessionGenFlowFile.read(flow1, std::ref(callback));
+ REQUIRE(callback.to_string() == expected[0]);
+ }
+
+ std::shared_ptr<core::FlowFile> flow2 = output_->poll(expiredFlowRecords);
+ REQUIRE(expiredFlowRecords.empty());
+ REQUIRE(flow2);
+ {
+ FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
+ sessionGenFlowFile.read(flow2, std::ref(callback));
+ REQUIRE(callback.to_string() == expected[1]);
+ }
+
+ // no more flowfiles
+ std::shared_ptr<core::FlowFile> flow3 = output_->poll(expiredFlowRecords);
+ REQUIRE(expiredFlowRecords.empty());
+ REQUIRE_FALSE(flow3);
+}