You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/28 12:32:54 UTC
[nifi-minifi-cpp] 02/02: MINIFICPP-1887 Add default connection size limits
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f27e349b508c502218ca702b78fad2ef931f4f83
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Feb 28 13:11:23 2023 +0100
MINIFICPP-1887 Add default connection size limits
default limits: 2000 flow files / 100MB data size
Closes #1501
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../tests/unit/FlowJsonTests.cpp | 4 +-
.../tests/unit/ProcessorTests.cpp | 10 +-
.../tests/unit/YamlConnectionParserTest.cpp | 4 +-
libminifi/include/Connection.h | 110 ++++++++-------------
libminifi/include/core/state/ConnectionStore.h | 4 +-
.../include/core/state/nodes/FlowInformation.h | 4 +-
libminifi/include/core/state/nodes/QueueMetrics.h | 4 +-
libminifi/src/Connection.cpp | 19 ++--
libminifi/src/core/ProcessSession.cpp | 2 +-
libminifi/src/core/Processor.cpp | 8 +-
.../src/core/flow/StructuredConfiguration.cpp | 4 +-
.../src/core/flow/StructuredConnectionParser.cpp | 4 +-
.../test/persistence-tests/PersistenceTests.cpp | 4 +-
libminifi/test/rocksdb-tests/RepoTests.cpp | 2 +-
libminifi/test/unit/ConnectionTests.cpp | 41 ++++++++
libminifi/test/unit/MetricsTests.cpp | 4 +-
libminifi/test/unit/ResponseNodeLoaderTests.cpp | 2 +-
17 files changed, 119 insertions(+), 111 deletions(-)
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
index b7d1a47f8..495965122 100644
--- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -150,8 +150,8 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
REQUIRE(connection1->getSource() == proc);
REQUIRE(connection1->getDestination() == funnel);
REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}}));
- REQUIRE(connection1->getMaxQueueSize() == 7);
- REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB);
+ REQUIRE(connection1->getBackpressureThresholdCount() == 7);
+ REQUIRE(connection1->getBackpressureThresholdDataSize() == 11_KiB);
REQUIRE(13s == connection1->getFlowExpirationDuration());
auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008");
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index b7afc647c..c3b43856a 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -245,7 +245,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "GFF2Connection");
- connection->setMaxQueueSize(5);
+ connection->setBackpressureThresholdCount(5);
connection->addRelationship(core::Relationship("success", "description"));
@@ -270,8 +270,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
auto session = std::make_shared<core::ProcessSession>(context);
- REQUIRE(session->outgoingConnectionsFull("success") == false);
- REQUIRE(connection->isFull() == false);
+ CHECK_FALSE(session->outgoingConnectionsFull("success"));
+ CHECK_FALSE(connection->backpressureThresholdReached());
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
@@ -279,8 +279,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
session->commit();
- REQUIRE(connection->isFull());
- REQUIRE(session->outgoingConnectionsFull("success"));
+ CHECK(connection->backpressureThresholdReached());
+ CHECK(session->outgoingConnectionsFull("success"));
}
TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index 44d346ed4..0444c71e6 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -183,8 +183,8 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
"drop empty: \n"});
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK(0 == yaml_connection_parser.getWorkQueueSize());
- CHECK(0 == yaml_connection_parser.getWorkQueueDataSize());
+ CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT == yaml_connection_parser.getWorkQueueSize());
+ CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE == yaml_connection_parser.getWorkQueueDataSize());
CHECK(0 == yaml_connection_parser.getSwapThreshold());
CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
CHECK(0 == yaml_connection_parser.getDropEmpty());
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 81e1a7a25..154958ba6 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -27,6 +27,7 @@
#include <mutex>
#include <atomic>
#include <algorithm>
+#include <utility>
#include "core/Core.h"
#include "core/Connectable.h"
#include "core/logging/Logger.h"
@@ -52,86 +53,81 @@ class Connection : public core::Connectable {
const utils::Identifier &srcUUID, const utils::Identifier &destUUID);
explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager,
std::string name, const utils::Identifier& uuid);
- // Destructor
~Connection() override = default;
- // Set Source Processor UUID
+ Connection(const Connection &parent) = delete;
+ Connection &operator=(const Connection &parent) = delete;
+
+ static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000;
+ static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB;
+
void setSourceUUID(const utils::Identifier &uuid) {
src_uuid_ = uuid;
}
- // Set Destination Processor UUID
+
void setDestinationUUID(const utils::Identifier &uuid) {
dest_uuid_ = uuid;
}
- // Get Source Processor UUID
+
utils::Identifier getSourceUUID() const {
return src_uuid_;
}
- // Get Destination Processor UUID
+
utils::Identifier getDestinationUUID() const {
return dest_uuid_;
}
- // Set Connection Source Processor
void setSource(core::Connectable* source) {
source_connectable_ = source;
}
- // ! Get Connection Source Processor
+
core::Connectable* getSource() const {
return source_connectable_;
}
- // Set Connection Destination Processor
+
void setDestination(core::Connectable* dest) {
dest_connectable_ = dest;
}
- // ! Get Connection Destination Processor
- core::Connectable* getDestination() {
- return dest_connectable_;
- }
- /**
- * Deprecated function
- * Please use addRelationship.
- */
- void setRelationship(core::Relationship relationship) {
- relationships_.insert(relationship);
+ core::Connectable* getDestination() const {
+ return dest_connectable_;
}
- // Set Connection relationship
void addRelationship(core::Relationship relationship) {
- relationships_.insert(relationship);
+ relationships_.insert(std::move(relationship));
}
- // ! Get Connection relationship
+
const std::set<core::Relationship> &getRelationships() const {
return relationships_;
}
- // Set Max Queue Size
- void setMaxQueueSize(uint64_t size) {
- max_queue_size_ = size;
+
+ void setBackpressureThresholdCount(uint64_t size) {
+ backpressure_threshold_count_ = size;
}
- // Get Max Queue Size
- uint64_t getMaxQueueSize() {
- return max_queue_size_;
+
+ uint64_t getBackpressureThresholdCount() const {
+ return backpressure_threshold_count_;
}
- // Set Max Queue Data Size
- void setMaxQueueDataSize(uint64_t size) {
- max_data_queue_size_ = size;
+
+ void setBackpressureThresholdDataSize(uint64_t size) {
+ backpressure_threshold_data_size_ = size;
+ }
+
+ uint64_t getBackpressureThresholdDataSize() const {
+ return backpressure_threshold_data_size_;
}
+
void setSwapThreshold(uint64_t size) {
queue_.setTargetSize(size);
queue_.setMinSize(size / 2);
queue_.setMaxSize(size * 3 / 2);
}
- // Get Max Queue Data Size
- uint64_t getMaxQueueDataSize() {
- return max_data_queue_size_;
- }
- // Set Flow expiration duration in millisecond
+
void setFlowExpirationDuration(std::chrono::milliseconds duration) {
expired_duration_ = duration;
}
- // Get Flow expiration duration in millisecond
- std::chrono::milliseconds getFlowExpirationDuration() {
+
+ std::chrono::milliseconds getFlowExpirationDuration() const {
return expired_duration_;
}
@@ -143,28 +139,25 @@ class Connection : public core::Connectable {
return drop_empty_;
}
- // Check whether the queue is empty
bool isEmpty() const;
- // Check whether the queue is full to apply back pressure
- bool isFull() const;
- // Get queue size
- uint64_t getQueueSize() {
+
+ bool backpressureThresholdReached() const;
+
+ uint64_t getQueueSize() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
- // Get queue data size
+
uint64_t getQueueDataSize() {
return queued_data_size_;
}
- // Put the flow file into queue
void put(const std::shared_ptr<core::FlowFile>& flow) override;
- // Put multiple flowfiles into the queue
void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows);
- // Poll the flow file from queue, the expired flow file record also being returned
+
std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
- // Drain the flow records
+
void drain(bool delete_permanently);
void yield() override {}
@@ -179,41 +172,22 @@ class Connection : public core::Connectable {
}
protected:
- // Source Processor UUID
utils::Identifier src_uuid_;
- // Destination Processor UUID
utils::Identifier dest_uuid_;
- // Relationship for this connection
std::set<core::Relationship> relationships_;
- // Source Processor (ProcessNode/Port)
core::Connectable* source_connectable_ = nullptr;
- // Destination Processor (ProcessNode/Port)
core::Connectable* dest_connectable_ = nullptr;
- // Max queue size to apply back pressure
- std::atomic<uint64_t> max_queue_size_ = 0;
- // Max queue data size to apply back pressure
- std::atomic<uint64_t> max_data_queue_size_ = 0;
- // Flow File Expiration Duration in= MilliSeconds
+ std::atomic<uint64_t> backpressure_threshold_count_ = DEFAULT_BACKPRESSURE_THRESHOLD_COUNT;
+ std::atomic<uint64_t> backpressure_threshold_data_size_ = DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE;
std::atomic<std::chrono::milliseconds> expired_duration_ = std::chrono::milliseconds(0);
- // flow file repository
std::shared_ptr<core::Repository> flow_repository_;
- // content repository reference.
std::shared_ptr<core::ContentRepository> content_repo_;
private:
bool drop_empty_ = false;
- // Mutex for protection
mutable std::mutex mutex_;
- // Queued data size
std::atomic<uint64_t> queued_data_size_ = 0;
- // Queue for the Flow File
utils::FlowFileQueue queue_;
- // flow repository
- // Logger
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Connection>::getLogger();
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Connection(const Connection &parent);
- Connection &operator=(const Connection &parent);
};
} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/state/ConnectionStore.h b/libminifi/include/core/state/ConnectionStore.h
index cd7ed2082..a5cfa4eff 100644
--- a/libminifi/include/core/state/ConnectionStore.h
+++ b/libminifi/include/core/state/ConnectionStore.h
@@ -40,11 +40,11 @@ class ConnectionStore {
for (const auto& [_, connection] : connections_) {
metrics.push_back({"queue_data_size", static_cast<double>(connection->getQueueDataSize()),
{{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
- metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getMaxQueueDataSize()),
+ metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getBackpressureThresholdDataSize()),
{{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
metrics.push_back({"queue_size", static_cast<double>(connection->getQueueSize()),
{{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
- metrics.push_back({"queue_size_max", static_cast<double>(connection->getMaxQueueSize()),
+ metrics.push_back({"queue_size_max", static_cast<double>(connection->getBackpressureThresholdCount()),
{{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
}
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index e08eb6684..894d938a0 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -198,7 +198,7 @@ class FlowInformation : public FlowMonitor {
SerializedResponseNode queuesizemax;
queuesizemax.name = "sizeMax";
- queuesizemax.value = queue.second->getMaxQueueSize();
+ queuesizemax.value = queue.second->getBackpressureThresholdCount();
SerializedResponseNode datasize;
datasize.name = "dataSize";
@@ -206,7 +206,7 @@ class FlowInformation : public FlowMonitor {
SerializedResponseNode datasizemax;
datasizemax.name = "dataSizeMax";
- datasizemax.value = queue.second->getMaxQueueDataSize();
+ datasizemax.value = queue.second->getBackpressureThresholdDataSize();
repoNode.children.push_back(queuesize);
repoNode.children.push_back(queuesizemax);
diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h
index f89d34da6..3b6a832d6 100644
--- a/libminifi/include/core/state/nodes/QueueMetrics.h
+++ b/libminifi/include/core/state/nodes/QueueMetrics.h
@@ -64,7 +64,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore {
SerializedResponseNode datasizemax;
datasizemax.name = "datasizemax";
- datasizemax.value = std::to_string(connection->getMaxQueueDataSize());
+ datasizemax.value = std::to_string(connection->getBackpressureThresholdDataSize());
SerializedResponseNode queuesize;
queuesize.name = "queued";
@@ -72,7 +72,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore {
SerializedResponseNode queuesizemax;
queuesizemax.name = "queuedmax";
- queuesizemax.value = std::to_string(connection->getMaxQueueSize());
+ queuesizemax.value = std::to_string(connection->getBackpressureThresholdCount());
parent.children.push_back(datasize);
parent.children.push_back(datasizemax);
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index cddd0463c..7c3145c39 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -18,20 +18,15 @@
* limitations under the License.
*/
#include "Connection.h"
-#include <ctime>
#include <vector>
-#include <queue>
#include <memory>
#include <string>
-#include <map>
#include <set>
#include <chrono>
#include <thread>
-#include <iostream>
#include <list>
#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/Connectable.h"
using namespace std::literals::chrono_literals;
@@ -85,17 +80,15 @@ bool Connection::isEmpty() const {
return queue_.empty();
}
-bool Connection::isFull() const {
+bool Connection::backpressureThresholdReached() const {
std::lock_guard<std::mutex> lock(mutex_);
+ auto backpressure_threshold_count = backpressure_threshold_count_.load();
+ auto backpressure_threshold_data_size = backpressure_threshold_data_size_.load();
- if (max_queue_size_ <= 0 && max_data_queue_size_ <= 0)
- // No back pressure setting
- return false;
-
- if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_)
+ if (backpressure_threshold_count != 0 && queue_.size() >= backpressure_threshold_count)
return true;
- if (max_data_queue_size_ > 0 && queued_data_size_ >= max_data_queue_size_)
+ if (backpressure_threshold_data_size != 0 && queued_data_size_ >= backpressure_threshold_data_size)
return true;
return false;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1d06b8da7..8bf30dc1c 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -1130,7 +1130,7 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
Connection * connection = nullptr;
for (const auto conn : connections) {
connection = dynamic_cast<Connection*>(conn);
- if (connection && connection->isFull()) {
+ if (connection && connection->backpressureThresholdReached()) {
return true;
}
}
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ffc186a96..fdb15f4a3 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -172,7 +172,7 @@ bool Processor::flowFilesOutGoingFull() const {
std::set<Connectable*> existedConnection = connection_pair.second;
const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const Connectable* conn) {
auto connection = dynamic_cast<const Connection*>(conn);
- return connection && connection->isFull();
+ return connection && connection->backpressureThresholdReached();
});
if (has_full_connection) { return true; }
}
@@ -312,12 +312,12 @@ bool Processor::isThrottledByBackpressure() const {
bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) {
return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) {
auto connection = dynamic_cast<Connection*>(connectable);
- return connection && connection->isFull();
+ return connection && connection->backpressureThresholdReached();
});
});
bool isForcedByIncomingCycle = ranges::any_of(incoming_connections_, [](auto& connectable) {
auto connection = dynamic_cast<Connection*>(connectable);
- return connection && partOfCycle(connection) && connection->isFull();
+ return connection && partOfCycle(connection) && connection->backpressureThresholdReached();
});
return isThrottledByOutgoing && !isForcedByIncomingCycle;
}
@@ -333,7 +333,7 @@ Connectable* Processor::pickIncomingConnection() {
if (!connection) {
continue;
}
- if (partOfCycle(connection) && connection->isFull()) {
+ if (partOfCycle(connection) && connection->backpressureThresholdReached()) {
return inConn;
}
} while (incoming_connections_Iter != beginIt);
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 02624a4dc..847258399 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -554,8 +554,8 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
logger_->log_debug("Created connection with UUID %s and name %s", id, name);
const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_);
connectionParser.configureConnectionSourceRelationships(*connection);
- connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
- connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
+ connection->setBackpressureThresholdCount(connectionParser.getWorkQueueSize());
+ connection->setBackpressureThresholdDataSize(connectionParser.getWorkQueueDataSize());
connection->setSwapThreshold(connectionParser.getSwapThreshold());
connection->setSourceUUID(connectionParser.getSourceUUID());
connection->setDestinationUUID(connectionParser.getDestinationUUID());
diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
index a6521884b..9ac23ac89 100644
--- a/libminifi/src/core/flow/StructuredConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -76,7 +76,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const {
}
logger_->log_error("Invalid max queue size value: %s.", max_work_queue_str);
}
- return 0;
+ return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT;
}
uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
@@ -90,7 +90,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
}
logger_->log_error("Invalid max queue data size value: %s.", max_work_queue_str);
}
- return 0;
+ return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE;
}
uint64_t StructuredConnectionParser::getSwapThreshold() const {
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index e417ae417..748775fcd 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -77,7 +77,7 @@ struct TestFlow{
auto input = std::make_unique<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
{
input_ = input.get();
- input->setRelationship({"input", "d"});
+ input->addRelationship({"input", "d"});
input->setDestinationUUID(mainProcUUID());
input->setSourceUUID(inputProcUUID());
inputProcessor->addConnection(input.get());
@@ -87,7 +87,7 @@ struct TestFlow{
auto output = std::make_unique<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
{
output_ = output.get();
- output->setRelationship(relationshipToOutput);
+ output->addRelationship(relationshipToOutput);
output->setSourceUUID(mainProcUUID());
}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index ef4d073bf..7a646b545 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -274,7 +274,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
core::Relationship inputRel{"Input", "dummy"};
auto input = std::make_unique<minifi::Connection>(ff_repository, content_repo, "Input");
- input->setRelationship(inputRel);
+ input->addRelationship(inputRel);
auto root = std::make_unique<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
auto inputPtr = input.get();
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
index f018ce1f6..9a82a893f 100644
--- a/libminifi/test/unit/ConnectionTests.cpp
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -84,3 +84,44 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
REQUIRE(nullptr == connection->poll(expired_flow_files));
}
}
+
+TEST_CASE("Connection backpressure tests", "[Connection]") {
+ const auto flow_repo = std::make_shared<TestRepository>();
+ const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
+
+ const auto id_generator = utils::IdGenerator::getIdGenerator();
+ const auto connection = std::make_shared<minifi::Connection>(flow_repo, content_repo, "test_connection", id_generator->generate(), id_generator->generate(), id_generator->generate());
+
+ CHECK(connection->getBackpressureThresholdDataSize() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE);
+ CHECK(connection->getBackpressureThresholdCount() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT);
+
+ SECTION("The number of flowfiles can be limited") {
+ connection->setBackpressureThresholdCount(2);
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ connection->put(std::make_shared<core::FlowFile>());
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ connection->put(std::make_shared<core::FlowFile>());
+ CHECK(connection->backpressureThresholdReached());
+ connection->setBackpressureThresholdCount(0);
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ }
+ SECTION("The size of the data can be limited") {
+ connection->setBackpressureThresholdDataSize(3_KB);
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ {
+ auto flow_file = std::make_shared<core::FlowFile>();
+ flow_file->setSize(2_KB);
+ connection->put(flow_file);
+ }
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ {
+ auto flow_file = std::make_shared<core::FlowFile>();
+ flow_file->setSize(2_KB);
+ connection->put(flow_file);
+ }
+ CHECK(connection->backpressureThresholdReached());
+ connection->setBackpressureThresholdDataSize(0);
+ CHECK_FALSE(connection->backpressureThresholdReached());
+ }
+}
diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp
index a73cecae5..972365fd4 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -53,8 +53,8 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
auto connection = std::make_unique<minifi::Connection>(repo, content_repo, "testconnection");
- connection->setMaxQueueDataSize(1024);
- connection->setMaxQueueSize(1024);
+ connection->setBackpressureThresholdDataSize(1024);
+ connection->setBackpressureThresholdCount(1024);
metrics.updateConnection(connection.get());
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
index 222346764..52f9238f1 100644
--- a/libminifi/test/unit/ResponseNodeLoaderTests.cpp
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -58,7 +58,7 @@ class ResponseNodeLoaderTestFixture {
void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
- connection->setRelationship({relationship_name, "d"});
+ connection->addRelationship({relationship_name, "d"});
connection->setDestinationUUID(src_uuid);
connection->setSourceUUID(dst_uuid);
root_->addConnection(std::move(connection));