You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/03/08 14:54:27 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new c8c4482  MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only
c8c4482 is described below

commit c8c448240690e2398534c99b101e7d536a367cb7
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Thu Feb 18 16:28:11 2021 +0100

    MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only
    
    * make mutex_ mutable and mark const methods const
    * remove C-style (void) from function declarations
    * remove pointless comments
    * add a missing virtual-override pair on isYield()
    * add a missing 'explicit'
    * remove the declaration of unimplemented method getNextIncomingConnection()
    
    * Test that the processor detects correctly whether it has any work to do
    * Document the current behavior of Connection::poll(), to be fixed later
    
    * Use a custom queue which is aware of penalty expirations
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1013
---
 .../tests/unit/ProcessorTests.cpp                  |  63 ++++++++
 libminifi/include/Connection.h                     |   6 +-
 libminifi/include/core/Processor.h                 |  51 +++----
 libminifi/include/utils/FlowFileQueue.h            |  58 ++++++++
 libminifi/src/Connection.cpp                       |  22 +--
 libminifi/src/SchedulingAgent.cpp                  |   2 +-
 libminifi/src/core/Processor.cpp                   |  19 +--
 libminifi/src/utils/FlowFileQueue.cpp              | 103 +++++++++++++
 libminifi/test/unit/ConnectionTests.cpp            |  88 +++++++++++
 libminifi/test/unit/FlowFileQueueTests.cpp         | 161 +++++++++++++++++++++
 libminifi/test/unit/MockClasses.h                  |   2 +-
 11 files changed, 500 insertions(+), 75 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 5d34024..1ebcd39 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -614,3 +614,66 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
 TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
+
+TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+  LogTestController::getInstance().setDebug<core::Processor>();
+
+  const auto repo = std::make_shared<TestRepository>();
+  const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
+  const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+  incoming_connection->addRelationship(core::Relationship{"success", ""});
+  incoming_connection->setDestinationUUID(processor->getUUID());
+  processor->addConnection(incoming_connection);
+  processor->initialize();
+
+  const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+  const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
+  const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
+  const auto session = session_factory->createSession();
+
+  SECTION("Initially, the queue is empty, so there is no work available") {
+    REQUIRE_FALSE(processor->isWorkAvailable());
+  }
+
+  SECTION("When a non-penalized flow file is queued, there is work available") {
+    const auto flow_file = session->create();
+    incoming_connection->put(flow_file);
+
+    REQUIRE(processor->isWorkAvailable());
+  }
+
+  SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
+    const auto flow_file = session->create();
+    session->penalize(flow_file);
+    incoming_connection->put(flow_file);
+
+    REQUIRE_FALSE(processor->isWorkAvailable());
+  }
+
+  SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
+    const auto normal_flow_file = session->create();
+    incoming_connection->put(normal_flow_file);
+
+    const auto penalized_flow_file = session->create();
+    session->penalize(penalized_flow_file);
+    incoming_connection->put(penalized_flow_file);
+
+    REQUIRE(processor->isWorkAvailable());
+  }
+
+  SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
+    processor->setPenalizationPeriodMsec(10);
+
+    const auto flow_file = session->create();
+    session->penalize(flow_file);
+    incoming_connection->put(flow_file);
+
+    REQUIRE_FALSE(processor->isWorkAvailable());
+    const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
+    REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+    REQUIRE(processor->isWorkAvailable());
+  }
+}
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 3084d2e..2c988dc 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -24,7 +24,6 @@
 #include <set>
 #include <string>
 #include <vector>
-#include <queue>
 #include <map>
 #include <mutex>
 #include <atomic>
@@ -35,6 +34,7 @@
 #include "core/Relationship.h"
 #include "core/FlowFile.h"
 #include "core/Repository.h"
+#include "utils/FlowFileQueue.h"
 
 namespace org {
 namespace apache {
@@ -167,7 +167,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   void yield() override {}
 
   bool isWorkAvailable() override {
-    return !isEmpty();
+    return queue_.canBePopped();
   }
 
   bool isRunning() override {
@@ -203,7 +203,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   // Queued data size
   std::atomic<uint64_t> queued_data_size_;
   // Queue for the Flow File
-  std::queue<std::shared_ptr<core::FlowFile>> queue_;
+  utils::FlowFileQueue queue_;
   // flow repository
   // Logger
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index b1fb982..3059b12 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -64,15 +64,8 @@ namespace core {
 // Processor Class
 class Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-
   Processor(const std::string& name, const utils::Identifier &uuid);
-
-  Processor(const std::string& name); // NOLINT
-  // Destructor
+  explicit Processor(const std::string& name);
   virtual ~Processor() {
     notifyStop();
   }
@@ -81,7 +74,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   // Set Processor Scheduled State
   void setScheduledState(ScheduledState state);
   // Get Processor Scheduled State
-  ScheduledState getScheduledState(void) {
+  ScheduledState getScheduledState() const {
     return state_;
   }
   // Set Processor Scheduling Strategy
@@ -89,7 +82,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     strategy_ = strategy;
   }
   // Get Processor Scheduling Strategy
-  SchedulingStrategy getSchedulingStrategy(void) {
+  SchedulingStrategy getSchedulingStrategy() const {
     return strategy_;
   }
   // Set Processor Loss Tolerant
@@ -97,7 +90,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     loss_tolerant_ = lossTolerant;
   }
   // Get Processor Loss Tolerant
-  bool getlossTolerant(void) {
+  bool getlossTolerant() const {
     return loss_tolerant_;
   }
   // Set Processor Scheduling Period in Nano Second
@@ -108,11 +101,10 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   scheduling_period_nano_ = period > minPeriod ? period : minPeriod;
   }
   // Get Processor Scheduling Period in Nano Second
-  uint64_t getSchedulingPeriodNano(void) {
+  uint64_t getSchedulingPeriodNano() const {
     return scheduling_period_nano_;
   }
 
-
   /**
    * Sets the cron period
    * @param period cron period.
@@ -129,13 +121,12 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     return cron_period_;
   }
 
-
   // Set Processor Run Duration in Nano Second
   void setRunDurationNano(uint64_t period) {
     run_duration_nano_ = period;
   }
   // Get Processor Run Duration in Nano Second
-  uint64_t getRunDurationNano(void) {
+  uint64_t getRunDurationNano() const {
     return (run_duration_nano_);
   }
   // Set Processor yield period in MilliSecond
@@ -143,7 +134,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     yield_period_msec_ = period;
   }
   // Get Processor yield period in MilliSecond
-  uint64_t getYieldPeriodMsec(void) {
+  uint64_t getYieldPeriodMsec() const {
     return (yield_period_msec_);
   }
   // Set Processor penalization period in MilliSecond
@@ -156,7 +147,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     max_concurrent_tasks_ = tasks;
   }
   // Get Processor Maximum Concurrent Tasks
-  uint8_t getMaxConcurrentTasks(void) {
+  uint8_t getMaxConcurrentTasks() const {
     return (max_concurrent_tasks_);
   }
   // Set Trigger when empty
@@ -164,23 +155,23 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     _triggerWhenEmpty = value;
   }
   // Get Trigger when empty
-  bool getTriggerWhenEmpty(void) {
+  bool getTriggerWhenEmpty() const {
     return (_triggerWhenEmpty);
   }
   // Get Active Task Counts
-  uint8_t getActiveTasks(void) {
+  uint8_t getActiveTasks() const {
     return (active_tasks_);
   }
   // Increment Active Task Counts
-  void incrementActiveTasks(void) {
+  void incrementActiveTasks() {
     active_tasks_++;
   }
   // decrement Active Task Counts
-  void decrementActiveTask(void) {
+  void decrementActiveTask() {
     if (active_tasks_ > 0)
       active_tasks_--;
   }
-  void clearActiveTask(void) {
+  void clearActiveTask() {
     active_tasks_ = 0;
   }
   // Yield based on the yield period
@@ -192,7 +183,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
   }
   // whether need be to yield
-  bool isYield() {
+  virtual bool isYield() {
     if (yield_expiration_ > 0)
       return (yield_expiration_ >= utils::timeutils::getTimeMillis());
     else
@@ -203,28 +194,20 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
     yield_expiration_ = 0;
   }
   // get yield time
-  uint64_t getYieldTime() {
+  uint64_t getYieldTime() const {
     uint64_t curTime = utils::timeutils::getTimeMillis();
     if (yield_expiration_ > curTime)
       return (yield_expiration_ - curTime);
     else
       return 0;
   }
-  // Whether flow file queued in incoming connection
-  bool flowFilesQueued();
   // Whether flow file queue full in any of the outgoin connection
-  bool flowFilesOutGoingFull();
+  bool flowFilesOutGoingFull() const;
 
-  // Add connection
   bool addConnection(std::shared_ptr<Connectable> connection);
-  // Remove connection
   void removeConnection(std::shared_ptr<Connectable> connection);
-  // Get the Next RoundRobin incoming connection
-  std::shared_ptr<Connection> getNextIncomingConnection();
-  // On Trigger
 
   virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory);
-
   void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory);
 
   bool canEdit() override {
@@ -295,7 +278,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
 
  private:
   // Mutex for protection
-  std::mutex mutex_;
+  mutable std::mutex mutex_;
   // Yield Expiration
   std::atomic<uint64_t> yield_expiration_;
 
diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h
new file mode 100644
index 0000000..4ec01b2
--- /dev/null
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "core/FlowFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+class FlowFileQueue {
+ public:
+  using value_type = std::shared_ptr<core::FlowFile>;
+
+  value_type pop();
+  value_type forcePop();
+  void push(const value_type& element);
+  void push(value_type&& element);
+  bool canBePopped() const;
+  bool empty() const;
+  size_t size() const;
+
+ private:
+  bool existsFlowFileWithExpiredPenalty() const;
+
+  struct FlowFilePenaltyExpirationComparator {
+    bool operator()(const value_type& left, const value_type& right);
+  };
+
+  std::queue<value_type> fifo_queue_;
+  std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> priority_queue_;
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index ccae760..d237896 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -181,9 +181,8 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
+  while (queue_.canBePopped()) {
+    std::shared_ptr<core::FlowFile> item = queue_.pop();
     queued_data_size_ -= item->getSize();
 
     if (expired_duration_ > 0) {
@@ -193,26 +192,12 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
         expiredFlowRecords.insert(item);
         logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
       } else {
-        // Flow record not expired
-        if (item->isPenalized()) {
-          // Flow record was penalized
-          queue_.push(item);
-          queued_data_size_ += item->getSize();
-          break;
-        }
         std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
         item->setConnection(connectable);
         logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
         return item;
       }
     } else {
-      // Flow record not expired
-      if (item->isPenalized()) {
-        // Flow record was penalized
-        queue_.push(item);
-        queued_data_size_ += item->getSize();
-        break;
-      }
       std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
       item->setConnection(connectable);
       logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
@@ -227,8 +212,7 @@ void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);
 
   while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.front();
-    queue_.pop();
+    std::shared_ptr<core::FlowFile> item = queue_.forcePop();
     logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
       if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 107aa10..a6ef4b0 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -33,7 +33,7 @@ namespace minifi {
 
 bool SchedulingAgent::hasWorkToDo(const std::shared_ptr<core::Processor>& processor) {
   // Whether it has work to do
-  if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->flowFilesQueued())
+  if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->isWorkAvailable())
     return true;
   else
     return false;
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 6294a2f..56eb82a 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -208,22 +208,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
   }
 }
 
-bool Processor::flowFilesQueued() {
-  std::lock_guard<std::mutex> lock(mutex_);
-
-  if (_incomingConnections.size() == 0)
-    return false;
-
-  for (auto &&conn : _incomingConnections) {
-    std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
-    if (connection->getQueueSize() > 0)
-      return true;
-  }
-
-  return false;
-}
-
-bool Processor::flowFilesOutGoingFull() {
+bool Processor::flowFilesOutGoingFull() const {
   std::lock_guard<std::mutex> lock(mutex_);
 
   for (const auto& connection_pair : out_going_connections_) {
@@ -285,7 +270,7 @@ bool Processor::isWorkAvailable() {
   try {
     for (const auto &conn : _incomingConnections) {
       std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
-      if (connection->getQueueSize() > 0) {
+      if (connection->isWorkAvailable()) {
         hasWork = true;
         break;
       }
diff --git a/libminifi/src/utils/FlowFileQueue.cpp b/libminifi/src/utils/FlowFileQueue.cpp
new file mode 100644
index 0000000..8fe9e78
--- /dev/null
+++ b/libminifi/src/utils/FlowFileQueue.cpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/FlowFileQueue.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_type& left, const value_type& right) {
+  // this is operator< implemented using > so that top() is the element with the smallest key (earliest expiration)
+  // rather than the element with the largest key, which is the default for std::priority_queue
+  return left->getPenaltyExpiration() > right->getPenaltyExpiration();
+}
+
+FlowFileQueue::value_type FlowFileQueue::pop() {
+  if (existsFlowFileWithExpiredPenalty()) {
+    value_type next_flow_file = priority_queue_.top();
+    priority_queue_.pop();
+    return next_flow_file;
+  }
+
+  if (!fifo_queue_.empty()) {
+    value_type next_flow_file = fifo_queue_.front();
+    fifo_queue_.pop();
+    return next_flow_file;
+  }
+
+  throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
+}
+
+/**
+ * Pops any flow file off the queue, whether it has an unexpired penalty or not.
+ */
+FlowFileQueue::value_type FlowFileQueue::forcePop() {
+  if (!fifo_queue_.empty()) {
+    value_type next_flow_file = fifo_queue_.front();
+    fifo_queue_.pop();
+    return next_flow_file;
+  }
+
+  if (!priority_queue_.empty()) {
+    value_type next_flow_file = priority_queue_.top();
+    priority_queue_.pop();
+    return next_flow_file;
+  }
+
+  throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+}
+
+void FlowFileQueue::push(const value_type& element) {
+  if (element->isPenalized()) {
+    priority_queue_.push(element);
+  } else {
+    fifo_queue_.push(element);
+  }
+}
+
+void FlowFileQueue::push(value_type&& element) {
+  if (element->isPenalized()) {
+    priority_queue_.push(std::move(element));
+  } else {
+    fifo_queue_.push(std::move(element));
+  }
+}
+
+bool FlowFileQueue::canBePopped() const {
+  return !fifo_queue_.empty() || existsFlowFileWithExpiredPenalty();
+}
+
+bool FlowFileQueue::empty() const {
+  return fifo_queue_.empty() && priority_queue_.empty();
+}
+
+size_t FlowFileQueue::size() const {
+  return fifo_queue_.size() + priority_queue_.size();
+}
+
+bool FlowFileQueue::existsFlowFileWithExpiredPenalty() const {
+  return !priority_queue_.empty() && !priority_queue_.top()->isPenalized();
+}
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
new file mode 100644
index 0000000..53e7ea8
--- /dev/null
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Connection.h"
+
+#include "../TestBase.h"
+
+TEST_CASE("Connection::poll() works correctly", "[poll]") {
+  const auto flow_repo = std::make_shared<TestRepository>();
+  const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const auto id_generator = utils::IdGenerator::getIdGenerator();
+  utils::Identifier connection_id = id_generator->generate();
+  utils::Identifier src_id = id_generator->generate();
+  utils::Identifier dest_id = id_generator->generate();
+
+  const auto connection = std::make_shared<minifi::Connection>(flow_repo, content_repo, "test_connection", connection_id, src_id, dest_id);
+  std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+
+  SECTION("when called on an empty Connection, poll() returns nullptr") {
+    SECTION("without expiration duration") {}
+    SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+    REQUIRE(nullptr == connection->poll(expired_flow_files));
+  }
+
+  SECTION("when called on a connection with a single flow file, poll() returns the flow file") {
+    SECTION("without expiration duration") {}
+    SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+    const auto flow_file = std::make_shared<core::FlowFile>();
+    connection->put(flow_file);
+    REQUIRE(flow_file == connection->poll(expired_flow_files));
+    REQUIRE(nullptr == connection->poll(expired_flow_files));
+  }
+
+  SECTION("when called on a connection with a single penalized flow file, poll() returns nullptr") {
+    SECTION("without expiration duration") {}
+    SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+    const auto flow_file = std::make_shared<core::FlowFile>();
+    const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
+    const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
+    flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+    connection->put(flow_file);
+    REQUIRE(nullptr == connection->poll(expired_flow_files));
+  }
+
+  SECTION("when called on a connection with a single expired flow file, poll() returns nullptr and returns the expired flow file in the out parameter") {
+    const auto flow_file = std::make_shared<core::FlowFile>();
+    connection->setFlowExpirationDuration(1);  // 1 millisecond
+    connection->put(flow_file);
+    std::this_thread::sleep_for(std::chrono::milliseconds{2});
+    REQUIRE(nullptr == connection->poll(expired_flow_files));
+    REQUIRE(std::set<std::shared_ptr<core::FlowFile>>{flow_file} == expired_flow_files);
+  }
+
+  SECTION("when there is a non-penalized flow file followed by a penalized flow file, poll() returns the non-penalized flow file") {
+    SECTION("without expiration duration") {}
+    SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+    const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+    const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
+    const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
+    penalized_flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+    connection->put(penalized_flow_file);
+
+    const auto flow_file = std::make_shared<core::FlowFile>();
+    connection->put(flow_file);
+
+    REQUIRE(flow_file == connection->poll(expired_flow_files));
+    REQUIRE(nullptr == connection->poll(expired_flow_files));
+  }
+}
diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp b/libminifi/test/unit/FlowFileQueueTests.cpp
new file mode 100644
index 0000000..d64169d
--- /dev/null
+++ b/libminifi/test/unit/FlowFileQueueTests.cpp
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FlowFileQueue.h"
+
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+
+TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") {
+  utils::FlowFileQueue queue;
+
+  REQUIRE(queue.empty());
+  REQUIRE(queue.size() == 0);
+  REQUIRE_FALSE(queue.canBePopped());
+  REQUIRE_THROWS(queue.pop());
+  REQUIRE_THROWS(queue.forcePop());
+}
+
+TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can pop it", "[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+  const auto flow_file = std::make_shared<core::FlowFile>();
+  queue.push(flow_file);
+
+  REQUIRE_FALSE(queue.empty());
+  REQUIRE(queue.size() == 1);
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file);
+}
+
+TEST_CASE("A flow file can be moved into the FlowFileQueue", "[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+
+  auto penalized_flow_file = std::make_shared<core::FlowFile>();
+  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 100);
+  queue.push(std::move(penalized_flow_file));
+
+  queue.push(std::make_shared<core::FlowFile>());
+
+  REQUIRE_FALSE(queue.empty());
+  REQUIRE(queue.size() == 2);
+}
+
+TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them in FIFO order", "[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+  const auto flow_file_1 = std::make_shared<core::FlowFile>();
+  queue.push(flow_file_1);
+  const auto flow_file_2 = std::make_shared<core::FlowFile>();
+  queue.push(flow_file_2);
+  const auto flow_file_3 = std::make_shared<core::FlowFile>();
+  queue.push(flow_file_3);
+
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_1);
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_2);
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_3);
+  REQUIRE_FALSE(queue.canBePopped());
+}
+
+namespace {
+
+class PenaltyHasExpired {
+ public:
+  explicit PenaltyHasExpired(const std::shared_ptr<core::FlowFile>& flow_file) : flow_file_(flow_file) {}
+  bool operator()() { return !flow_file_->isPenalized(); }
+
+ private:
+  std::shared_ptr<core::FlowFile> flow_file_;
+};
+
+}  // namespace
+
+TEST_CASE("Penalized flow files are popped from the FlowFileQueue in the order their penalties expire", "[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+  const auto now = utils::timeutils::getTimeMillis();
+  const auto flow_file_1 = std::make_shared<core::FlowFile>();
+  flow_file_1->setPenaltyExpiration(now + 70);
+  queue.push(flow_file_1);
+  const auto flow_file_2 = std::make_shared<core::FlowFile>();
+  flow_file_2->setPenaltyExpiration(now + 50);
+  queue.push(flow_file_2);
+  const auto flow_file_3 = std::make_shared<core::FlowFile>();
+  flow_file_3->setPenaltyExpiration(now + 80);
+  queue.push(flow_file_3);
+  const auto flow_file_4 = std::make_shared<core::FlowFile>();
+  flow_file_4->setPenaltyExpiration(now + 60);
+  queue.push(flow_file_4);
+
+  REQUIRE_FALSE(queue.canBePopped());
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_2}, std::chrono::milliseconds{10}));
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_2);
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_4}, std::chrono::milliseconds{10}));
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_4);
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_1}, std::chrono::milliseconds{10}));
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_1);
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_3}, std::chrono::milliseconds{10}));
+  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.pop() == flow_file_3);
+
+  REQUIRE_FALSE(queue.canBePopped());
+}
+
+TEST_CASE("If a penalized then a non-penalized flow file is added to the FlowFileQueue, pop() returns the correct one", "[FlowFileQueue][pop]") {
+  utils::FlowFileQueue queue;
+  const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+  queue.push(penalized_flow_file);
+  const auto flow_file = std::make_shared<core::FlowFile>();
+  queue.push(flow_file);
+
+  SECTION("Try popping right away") {
+    REQUIRE(queue.canBePopped());
+    REQUIRE(queue.pop() == flow_file);
+    REQUIRE_FALSE(queue.canBePopped());
+  }
+
+  SECTION("Wait until the penalty expires, then pop") {
+    REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{penalized_flow_file}, std::chrono::milliseconds{10}));
+
+    REQUIRE(queue.canBePopped());
+    REQUIRE(queue.pop() == penalized_flow_file);
+    REQUIRE(queue.canBePopped());
+    REQUIRE(queue.pop() == flow_file);
+    REQUIRE_FALSE(queue.canBePopped());
+  }
+}
+
+TEST_CASE("Force pop on FlowFileQueue returns the flow files, whether penalized or not", "[FlowFileQueue][forcePop]") {
+  utils::FlowFileQueue queue;
+  const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+  queue.push(penalized_flow_file);
+  const auto flow_file = std::make_shared<core::FlowFile>();
+  queue.push(flow_file);
+
+  REQUIRE_FALSE(queue.empty());
+  REQUIRE(queue.forcePop() == flow_file);
+  REQUIRE(queue.forcePop() == penalized_flow_file);
+  REQUIRE(queue.empty());
+}
diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h
index ac64c20..c55851d 100644
--- a/libminifi/test/unit/MockClasses.h
+++ b/libminifi/test/unit/MockClasses.h
@@ -115,7 +115,7 @@ class MockProcessor : public core::Processor {
     }
   }
 
-  bool isYield() {
+  bool isYield() override {
     return false;
   }