You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/28 12:32:54 UTC

[nifi-minifi-cpp] 02/02: MINIFICPP-1887 Add default connection size limits

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f27e349b508c502218ca702b78fad2ef931f4f83
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Feb 28 13:11:23 2023 +0100

    MINIFICPP-1887 Add default connection size limits
    
    default limits: 2000 flow files / 100MB data size
    
    Closes #1501
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../tests/unit/FlowJsonTests.cpp                   |   4 +-
 .../tests/unit/ProcessorTests.cpp                  |  10 +-
 .../tests/unit/YamlConnectionParserTest.cpp        |   4 +-
 libminifi/include/Connection.h                     | 110 ++++++++-------------
 libminifi/include/core/state/ConnectionStore.h     |   4 +-
 .../include/core/state/nodes/FlowInformation.h     |   4 +-
 libminifi/include/core/state/nodes/QueueMetrics.h  |   4 +-
 libminifi/src/Connection.cpp                       |  19 ++--
 libminifi/src/core/ProcessSession.cpp              |   2 +-
 libminifi/src/core/Processor.cpp                   |   8 +-
 .../src/core/flow/StructuredConfiguration.cpp      |   4 +-
 .../src/core/flow/StructuredConnectionParser.cpp   |   4 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   4 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   2 +-
 libminifi/test/unit/ConnectionTests.cpp            |  41 ++++++++
 libminifi/test/unit/MetricsTests.cpp               |   4 +-
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    |   2 +-
 17 files changed, 119 insertions(+), 111 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
index b7d1a47f8..495965122 100644
--- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -150,8 +150,8 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
   REQUIRE(connection1->getSource() == proc);
   REQUIRE(connection1->getDestination() == funnel);
   REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}}));
-  REQUIRE(connection1->getMaxQueueSize() == 7);
-  REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB);
+  REQUIRE(connection1->getBackpressureThresholdCount() == 7);
+  REQUIRE(connection1->getBackpressureThresholdDataSize() == 11_KiB);
   REQUIRE(13s == connection1->getFlowExpirationDuration());
 
   auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008");
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index b7afc647c..c3b43856a 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -245,7 +245,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
 
   std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "GFF2Connection");
-  connection->setMaxQueueSize(5);
+  connection->setBackpressureThresholdCount(5);
   connection->addRelationship(core::Relationship("success", "description"));
 
 
@@ -270,8 +270,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
 
   auto session = std::make_shared<core::ProcessSession>(context);
 
-  REQUIRE(session->outgoingConnectionsFull("success") == false);
-  REQUIRE(connection->isFull() == false);
+  CHECK_FALSE(session->outgoingConnectionsFull("success"));
+  CHECK_FALSE(connection->backpressureThresholdReached());
 
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
@@ -279,8 +279,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") {
 
   session->commit();
 
-  REQUIRE(connection->isFull());
-  REQUIRE(session->outgoingConnectionsFull("success"));
+  CHECK(connection->backpressureThresholdReached());
+  CHECK(session->outgoingConnectionsFull("success"));
 }
 
 TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index 44d346ed4..0444c71e6 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -183,8 +183,8 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
             "drop empty: \n"});
         flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
         StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
-        CHECK(0 == yaml_connection_parser.getWorkQueueSize());
-        CHECK(0 == yaml_connection_parser.getWorkQueueDataSize());
+        CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT == yaml_connection_parser.getWorkQueueSize());
+        CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE == yaml_connection_parser.getWorkQueueDataSize());
         CHECK(0 == yaml_connection_parser.getSwapThreshold());
         CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
         CHECK(0 == yaml_connection_parser.getDropEmpty());
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 81e1a7a25..154958ba6 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -27,6 +27,7 @@
 #include <mutex>
 #include <atomic>
 #include <algorithm>
+#include <utility>
 #include "core/Core.h"
 #include "core/Connectable.h"
 #include "core/logging/Logger.h"
@@ -52,86 +53,81 @@ class Connection : public core::Connectable {
                       const utils::Identifier &srcUUID, const utils::Identifier &destUUID);
   explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager,
                       std::string name, const utils::Identifier& uuid);
-  // Destructor
   ~Connection() override = default;
 
-  // Set Source Processor UUID
+  Connection(const Connection &parent) = delete;
+  Connection &operator=(const Connection &parent) = delete;
+
+  static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000;
+  static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB;
+
   void setSourceUUID(const utils::Identifier &uuid) {
     src_uuid_ = uuid;
   }
-  // Set Destination Processor UUID
+
   void setDestinationUUID(const utils::Identifier &uuid) {
     dest_uuid_ = uuid;
   }
-  // Get Source Processor UUID
+
   utils::Identifier getSourceUUID() const {
     return src_uuid_;
   }
-  // Get Destination Processor UUID
+
   utils::Identifier getDestinationUUID() const {
     return dest_uuid_;
   }
 
-  // Set Connection Source Processor
   void setSource(core::Connectable* source) {
     source_connectable_ = source;
   }
-  // ! Get Connection Source Processor
+
   core::Connectable* getSource() const {
     return source_connectable_;
   }
-  // Set Connection Destination Processor
+
   void setDestination(core::Connectable* dest) {
     dest_connectable_ = dest;
   }
-  // ! Get Connection Destination Processor
-  core::Connectable* getDestination() {
-    return dest_connectable_;
-  }
 
-  /**
-   * Deprecated function
-   * Please use addRelationship.
-   */
-  void setRelationship(core::Relationship relationship) {
-    relationships_.insert(relationship);
+  core::Connectable* getDestination() const {
+    return dest_connectable_;
   }
 
-  // Set Connection relationship
   void addRelationship(core::Relationship relationship) {
-    relationships_.insert(relationship);
+    relationships_.insert(std::move(relationship));
   }
-  // ! Get Connection relationship
+
   const std::set<core::Relationship> &getRelationships() const {
     return relationships_;
   }
-  // Set Max Queue Size
-  void setMaxQueueSize(uint64_t size) {
-    max_queue_size_ = size;
+
+  void setBackpressureThresholdCount(uint64_t size) {
+    backpressure_threshold_count_ = size;
   }
-  // Get Max Queue Size
-  uint64_t getMaxQueueSize() {
-    return max_queue_size_;
+
+  uint64_t getBackpressureThresholdCount() const {
+    return backpressure_threshold_count_;
   }
-  // Set Max Queue Data Size
-  void setMaxQueueDataSize(uint64_t size) {
-    max_data_queue_size_ = size;
+
+  void setBackpressureThresholdDataSize(uint64_t size) {
+    backpressure_threshold_data_size_ = size;
+  }
+
+  uint64_t getBackpressureThresholdDataSize() const {
+    return backpressure_threshold_data_size_;
   }
+
   void setSwapThreshold(uint64_t size) {
     queue_.setTargetSize(size);
     queue_.setMinSize(size / 2);
     queue_.setMaxSize(size * 3 / 2);
   }
-  // Get Max Queue Data Size
-  uint64_t getMaxQueueDataSize() {
-    return max_data_queue_size_;
-  }
-  // Set Flow expiration duration in millisecond
+
   void setFlowExpirationDuration(std::chrono::milliseconds duration) {
     expired_duration_ = duration;
   }
-  // Get Flow expiration duration in millisecond
-  std::chrono::milliseconds getFlowExpirationDuration() {
+
+  std::chrono::milliseconds getFlowExpirationDuration() const {
     return expired_duration_;
   }
 
@@ -143,28 +139,25 @@ class Connection : public core::Connectable {
     return drop_empty_;
   }
 
-  // Check whether the queue is empty
   bool isEmpty() const;
-  // Check whether the queue is full to apply back pressure
-  bool isFull() const;
-  // Get queue size
-  uint64_t getQueueSize() {
+
+  bool backpressureThresholdReached() const;
+
+  uint64_t getQueueSize() const {
     std::lock_guard<std::mutex> lock(mutex_);
     return queue_.size();
   }
-  // Get queue data size
+
   uint64_t getQueueDataSize() {
     return queued_data_size_;
   }
 
-  // Put the flow file into queue
   void put(const std::shared_ptr<core::FlowFile>& flow) override;
 
-  // Put multiple flowfiles into the queue
   void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows);
-  // Poll the flow file from queue, the expired flow file record also being returned
+
   std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
-  // Drain the flow records
+
   void drain(bool delete_permanently);
 
   void yield() override {}
@@ -179,41 +172,22 @@ class Connection : public core::Connectable {
   }
 
  protected:
-  // Source Processor UUID
   utils::Identifier src_uuid_;
-  // Destination Processor UUID
   utils::Identifier dest_uuid_;
-  // Relationship for this connection
   std::set<core::Relationship> relationships_;
-  // Source Processor (ProcessNode/Port)
   core::Connectable* source_connectable_ = nullptr;
-  // Destination Processor (ProcessNode/Port)
   core::Connectable* dest_connectable_ = nullptr;
-  // Max queue size to apply back pressure
-  std::atomic<uint64_t> max_queue_size_ = 0;
-  // Max queue data size to apply back pressure
-  std::atomic<uint64_t> max_data_queue_size_ = 0;
-  // Flow File Expiration Duration in= MilliSeconds
+  std::atomic<uint64_t> backpressure_threshold_count_ = DEFAULT_BACKPRESSURE_THRESHOLD_COUNT;
+  std::atomic<uint64_t> backpressure_threshold_data_size_ = DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE;
   std::atomic<std::chrono::milliseconds> expired_duration_ = std::chrono::milliseconds(0);
-  // flow file repository
   std::shared_ptr<core::Repository> flow_repository_;
-  // content repository reference.
   std::shared_ptr<core::ContentRepository> content_repo_;
 
  private:
   bool drop_empty_ = false;
-  // Mutex for protection
   mutable std::mutex mutex_;
-  // Queued data size
   std::atomic<uint64_t> queued_data_size_ = 0;
-  // Queue for the Flow File
   utils::FlowFileQueue queue_;
-  // flow repository
-  // Logger
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Connection>::getLogger();
-  // Prevent default copy constructor and assignment operation
-  // Only support pass by reference or pointer
-  Connection(const Connection &parent);
-  Connection &operator=(const Connection &parent);
 };
 }  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/state/ConnectionStore.h b/libminifi/include/core/state/ConnectionStore.h
index cd7ed2082..a5cfa4eff 100644
--- a/libminifi/include/core/state/ConnectionStore.h
+++ b/libminifi/include/core/state/ConnectionStore.h
@@ -40,11 +40,11 @@ class ConnectionStore {
     for (const auto& [_, connection] : connections_) {
       metrics.push_back({"queue_data_size", static_cast<double>(connection->getQueueDataSize()),
         {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
-      metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getMaxQueueDataSize()),
+      metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getBackpressureThresholdDataSize()),
         {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
       metrics.push_back({"queue_size", static_cast<double>(connection->getQueueSize()),
         {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
-      metrics.push_back({"queue_size_max", static_cast<double>(connection->getMaxQueueSize()),
+      metrics.push_back({"queue_size_max", static_cast<double>(connection->getBackpressureThresholdCount()),
         {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}});
     }
 
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index e08eb6684..894d938a0 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -198,7 +198,7 @@ class FlowInformation : public FlowMonitor {
 
         SerializedResponseNode queuesizemax;
         queuesizemax.name = "sizeMax";
-        queuesizemax.value = queue.second->getMaxQueueSize();
+        queuesizemax.value = queue.second->getBackpressureThresholdCount();
 
         SerializedResponseNode datasize;
         datasize.name = "dataSize";
@@ -206,7 +206,7 @@ class FlowInformation : public FlowMonitor {
         SerializedResponseNode datasizemax;
 
         datasizemax.name = "dataSizeMax";
-        datasizemax.value = queue.second->getMaxQueueDataSize();
+        datasizemax.value = queue.second->getBackpressureThresholdDataSize();
 
         repoNode.children.push_back(queuesize);
         repoNode.children.push_back(queuesizemax);
diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h
index f89d34da6..3b6a832d6 100644
--- a/libminifi/include/core/state/nodes/QueueMetrics.h
+++ b/libminifi/include/core/state/nodes/QueueMetrics.h
@@ -64,7 +64,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore {
 
       SerializedResponseNode datasizemax;
       datasizemax.name = "datasizemax";
-      datasizemax.value = std::to_string(connection->getMaxQueueDataSize());
+      datasizemax.value = std::to_string(connection->getBackpressureThresholdDataSize());
 
       SerializedResponseNode queuesize;
       queuesize.name = "queued";
@@ -72,7 +72,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore {
 
       SerializedResponseNode queuesizemax;
       queuesizemax.name = "queuedmax";
-      queuesizemax.value = std::to_string(connection->getMaxQueueSize());
+      queuesizemax.value = std::to_string(connection->getBackpressureThresholdCount());
 
       parent.children.push_back(datasize);
       parent.children.push_back(datasizemax);
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index cddd0463c..7c3145c39 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -18,20 +18,15 @@
  * limitations under the License.
  */
 #include "Connection.h"
-#include <ctime>
 #include <vector>
-#include <queue>
 #include <memory>
 #include <string>
-#include <map>
 #include <set>
 #include <chrono>
 #include <thread>
-#include <iostream>
 #include <list>
 #include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/logging/LoggerConfiguration.h"
+#include "core/Connectable.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -85,17 +80,15 @@ bool Connection::isEmpty() const {
   return queue_.empty();
 }
 
-bool Connection::isFull() const {
+bool Connection::backpressureThresholdReached() const {
   std::lock_guard<std::mutex> lock(mutex_);
+  auto backpressure_threshold_count = backpressure_threshold_count_.load();
+  auto backpressure_threshold_data_size = backpressure_threshold_data_size_.load();
 
-  if (max_queue_size_ <= 0 && max_data_queue_size_ <= 0)
-    // No back pressure setting
-    return false;
-
-  if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_)
+  if (backpressure_threshold_count != 0 && queue_.size() >= backpressure_threshold_count)
     return true;
 
-  if (max_data_queue_size_ > 0 && queued_data_size_ >= max_data_queue_size_)
+  if (backpressure_threshold_data_size != 0 && queued_data_size_ >= backpressure_threshold_data_size)
     return true;
 
   return false;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1d06b8da7..8bf30dc1c 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -1130,7 +1130,7 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
   Connection * connection = nullptr;
   for (const auto conn : connections) {
     connection = dynamic_cast<Connection*>(conn);
-    if (connection && connection->isFull()) {
+    if (connection && connection->backpressureThresholdReached()) {
       return true;
     }
   }
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ffc186a96..fdb15f4a3 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -172,7 +172,7 @@ bool Processor::flowFilesOutGoingFull() const {
     std::set<Connectable*> existedConnection = connection_pair.second;
     const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const Connectable* conn) {
       auto connection = dynamic_cast<const Connection*>(conn);
-      return connection && connection->isFull();
+      return connection && connection->backpressureThresholdReached();
     });
     if (has_full_connection) { return true; }
   }
@@ -312,12 +312,12 @@ bool Processor::isThrottledByBackpressure() const {
   bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) {
     return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) {
       auto connection = dynamic_cast<Connection*>(connectable);
-      return connection && connection->isFull();
+      return connection && connection->backpressureThresholdReached();
     });
   });
   bool isForcedByIncomingCycle = ranges::any_of(incoming_connections_, [](auto& connectable) {
     auto connection = dynamic_cast<Connection*>(connectable);
-    return connection && partOfCycle(connection) && connection->isFull();
+    return connection && partOfCycle(connection) && connection->backpressureThresholdReached();
   });
   return isThrottledByOutgoing && !isForcedByIncomingCycle;
 }
@@ -333,7 +333,7 @@ Connectable* Processor::pickIncomingConnection() {
     if (!connection) {
       continue;
     }
-    if (partOfCycle(connection) && connection->isFull()) {
+    if (partOfCycle(connection) && connection->backpressureThresholdReached()) {
       return inConn;
     }
   } while (incoming_connections_Iter != beginIt);
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 02624a4dc..847258399 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -554,8 +554,8 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
     logger_->log_debug("Created connection with UUID %s and name %s", id, name);
     const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_);
     connectionParser.configureConnectionSourceRelationships(*connection);
-    connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
-    connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
+    connection->setBackpressureThresholdCount(connectionParser.getWorkQueueSize());
+    connection->setBackpressureThresholdDataSize(connectionParser.getWorkQueueDataSize());
     connection->setSwapThreshold(connectionParser.getSwapThreshold());
     connection->setSourceUUID(connectionParser.getSourceUUID());
     connection->setDestinationUUID(connectionParser.getDestinationUUID());
diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
index a6521884b..9ac23ac89 100644
--- a/libminifi/src/core/flow/StructuredConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -76,7 +76,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const {
     }
     logger_->log_error("Invalid max queue size value: %s.", max_work_queue_str);
   }
-  return 0;
+  return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT;
 }
 
 uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
@@ -90,7 +90,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
     }
     logger_->log_error("Invalid max queue data size value: %s.", max_work_queue_str);
   }
-  return 0;
+  return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE;
 }
 
 uint64_t StructuredConnectionParser::getSwapThreshold() const {
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index e417ae417..748775fcd 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -77,7 +77,7 @@ struct TestFlow{
     auto input = std::make_unique<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
     {
       input_ = input.get();
-      input->setRelationship({"input", "d"});
+      input->addRelationship({"input", "d"});
       input->setDestinationUUID(mainProcUUID());
       input->setSourceUUID(inputProcUUID());
       inputProcessor->addConnection(input.get());
@@ -87,7 +87,7 @@ struct TestFlow{
     auto output = std::make_unique<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
     {
       output_ = output.get();
-      output->setRelationship(relationshipToOutput);
+      output->addRelationship(relationshipToOutput);
       output->setSourceUUID(mainProcUUID());
     }
 
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index ef4d073bf..7a646b545 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -274,7 +274,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   core::Relationship inputRel{"Input", "dummy"};
   auto input = std::make_unique<minifi::Connection>(ff_repository, content_repo, "Input");
-  input->setRelationship(inputRel);
+  input->addRelationship(inputRel);
 
   auto root = std::make_unique<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
   auto inputPtr = input.get();
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
index f018ce1f6..9a82a893f 100644
--- a/libminifi/test/unit/ConnectionTests.cpp
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -84,3 +84,44 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
     REQUIRE(nullptr == connection->poll(expired_flow_files));
   }
 }
+
+TEST_CASE("Connection backpressure tests", "[Connection]") {
+  const auto flow_repo = std::make_shared<TestRepository>();
+  const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const auto id_generator = utils::IdGenerator::getIdGenerator();
+  const auto connection = std::make_shared<minifi::Connection>(flow_repo, content_repo, "test_connection", id_generator->generate(), id_generator->generate(), id_generator->generate());
+
+  CHECK(connection->getBackpressureThresholdDataSize() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE);
+  CHECK(connection->getBackpressureThresholdCount() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT);
+
+  SECTION("The number of flowfiles can be limited") {
+    connection->setBackpressureThresholdCount(2);
+    CHECK_FALSE(connection->backpressureThresholdReached());
+    connection->put(std::make_shared<core::FlowFile>());
+    CHECK_FALSE(connection->backpressureThresholdReached());
+    connection->put(std::make_shared<core::FlowFile>());
+    CHECK(connection->backpressureThresholdReached());
+    connection->setBackpressureThresholdCount(0);
+    CHECK_FALSE(connection->backpressureThresholdReached());
+  }
+  SECTION("The size of the data can be limited") {
+    connection->setBackpressureThresholdDataSize(3_KB);
+    CHECK_FALSE(connection->backpressureThresholdReached());
+    {
+      auto flow_file = std::make_shared<core::FlowFile>();
+      flow_file->setSize(2_KB);
+      connection->put(flow_file);
+    }
+    CHECK_FALSE(connection->backpressureThresholdReached());
+    {
+      auto flow_file = std::make_shared<core::FlowFile>();
+      flow_file->setSize(2_KB);
+      connection->put(flow_file);
+    }
+    CHECK(connection->backpressureThresholdReached());
+    connection->setBackpressureThresholdDataSize(0);
+    CHECK_FALSE(connection->backpressureThresholdReached());
+  }
+}
diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp
index a73cecae5..972365fd4 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -53,8 +53,8 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
 
   auto connection = std::make_unique<minifi::Connection>(repo, content_repo, "testconnection");
 
-  connection->setMaxQueueDataSize(1024);
-  connection->setMaxQueueSize(1024);
+  connection->setBackpressureThresholdDataSize(1024);
+  connection->setBackpressureThresholdCount(1024);
 
   metrics.updateConnection(connection.get());
 
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
index 222346764..52f9238f1 100644
--- a/libminifi/test/unit/ResponseNodeLoaderTests.cpp
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -58,7 +58,7 @@ class ResponseNodeLoaderTestFixture {
 
   void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
     auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
-    connection->setRelationship({relationship_name, "d"});
+    connection->addRelationship({relationship_name, "d"});
     connection->setDestinationUUID(src_uuid);
     connection->setSourceUUID(dst_uuid);
     root_->addConnection(std::move(connection));