You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2021/03/16 09:18:37 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1487 Use a single priority queue in FlowFileQueue

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fc1a9dd  MINIFICPP-1487 Use a single priority queue in FlowFileQueue
fc1a9dd is described below

commit fc1a9ddf8c91686eb98d56569dcdc0e1b772451f
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Tue Mar 16 10:14:46 2021 +0100

    MINIFICPP-1487 Use a single priority queue in FlowFileQueue
    
    This closes #1027
    
    Co-authored-by: Márton Szász <sz...@gmail.com>
    Signed-off-by: Adam Debreceni <ad...@apache.org>
---
 .../tests/unit/ProcessorTests.cpp                  | 124 ++++++++++++++++-----
 .../tests/unit/RetryFlowFileTests.cpp              |   2 +-
 .../tests/unit/YamlConfigurationTests.cpp          |   4 +-
 libminifi/include/Connection.h                     |   2 +-
 libminifi/include/core/Connectable.h               |   7 +-
 libminifi/include/core/FlowFile.h                  |  18 ++-
 libminifi/include/core/Processor.h                 |   6 +-
 libminifi/include/core/ProcessorConfig.h           |   4 +-
 libminifi/include/core/ProcessorNode.h             |   5 +-
 .../include/core/state/nodes/SchedulingNodes.h     |   2 +-
 libminifi/include/utils/FlowFileQueue.h            |   8 +-
 libminifi/src/Connection.cpp                       |   4 +-
 libminifi/src/core/FlowFile.cpp                    |   4 +-
 libminifi/src/core/ProcessSession.cpp              |   6 +-
 libminifi/src/core/Processor.cpp                   |   4 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp      |   2 +-
 libminifi/src/utils/FlowFileQueue.cpp              |  63 +++--------
 libminifi/test/unit/ConnectionTests.cpp            |   8 +-
 libminifi/test/unit/FlowFileQueueTests.cpp         |  56 +++++-----
 19 files changed, 174 insertions(+), 155 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 1ebcd39..7ca8000 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -615,65 +615,131 @@ TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
 
-TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+namespace {
+
+class ProcessorWithIncomingConnectionTest {
+ public:
+  ProcessorWithIncomingConnectionTest();
+  ~ProcessorWithIncomingConnectionTest();
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<minifi::Connection> incoming_connection_;
+  std::shared_ptr<core::ProcessSession> session_;
+};
+
+ProcessorWithIncomingConnectionTest::ProcessorWithIncomingConnectionTest() {
   LogTestController::getInstance().setDebug<core::Processor>();
 
   const auto repo = std::make_shared<TestRepository>();
   const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
-  const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
-  const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
-  incoming_connection->addRelationship(core::Relationship{"success", ""});
-  incoming_connection->setDestinationUUID(processor->getUUID());
-  processor->addConnection(incoming_connection);
-  processor->initialize();
+  processor_ = std::make_shared<processors::LogAttribute>("test_processor");
+  incoming_connection_ = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+  incoming_connection_->addRelationship(core::Relationship{"success", ""});
+  incoming_connection_->setDestinationUUID(processor_->getUUID());
+  processor_->addConnection(incoming_connection_);
+  processor_->initialize();
 
-  const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+  const auto processor_node = std::make_shared<core::ProcessorNode>(processor_);
   const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
   const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
-  const auto session = session_factory->createSession();
+  session_ = session_factory->createSession();
+}
 
+ProcessorWithIncomingConnectionTest::~ProcessorWithIncomingConnectionTest() {
+  LogTestController::getInstance().reset();
+}
+
+}  // namespace
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
   SECTION("Initially, the queue is empty, so there is no work available") {
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("When a non-penalized flow file is queued, there is work available") {
-    const auto flow_file = session->create();
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    incoming_connection_->put(flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
-    const auto normal_flow_file = session->create();
-    incoming_connection->put(normal_flow_file);
+    const auto normal_flow_file = session_->create();
+    incoming_connection_->put(normal_flow_file);
 
-    const auto penalized_flow_file = session->create();
-    session->penalize(penalized_flow_file);
-    incoming_connection->put(penalized_flow_file);
+    const auto penalized_flow_file = session_->create();
+    session_->penalize(penalized_flow_file);
+    incoming_connection_->put(penalized_flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
-    processor->setPenalizationPeriodMsec(10);
+    processor_->setPenalizationPeriod(std::chrono::milliseconds{10});
 
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
     const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
     REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 }
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A failed and re-penalized flow file does not block the incoming queue of the Processor", "[penalize]") {
+  processor_->setPenalizationPeriod(std::chrono::milliseconds{100});
+  const auto penalized_flow_file = session_->create();
+  session_->penalize(penalized_flow_file);
+  incoming_connection_->put(penalized_flow_file);
+  const auto penalty_has_expired = [penalized_flow_file] { return !penalized_flow_file->isPenalized(); };
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+
+  const auto flow_file_1 = session_->create();
+  incoming_connection_->put(flow_file_1);
+  const auto flow_file_2 = session_->create();
+  incoming_connection_->put(flow_file_2);
+  const auto flow_file_3 = session_->create();
+  incoming_connection_->put(flow_file_3);
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+  const auto next_flow_file_1 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_1 == penalized_flow_file);
+
+  session_->penalize(penalized_flow_file);
+  incoming_connection_->put(penalized_flow_file);
+  std::this_thread::sleep_for(std::chrono::milliseconds{110});
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  const auto next_flow_file_2 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_2 != penalized_flow_file);
+  REQUIRE(next_flow_file_2 == flow_file_1);
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  const auto next_flow_file_3 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_3 != penalized_flow_file);
+  REQUIRE(next_flow_file_3 == flow_file_2);
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  const auto next_flow_file_4 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_4 != penalized_flow_file);
+  REQUIRE(next_flow_file_4 == flow_file_3);
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  const auto next_flow_file_5 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_5 == penalized_flow_file);
+}
diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
index e367ec8..8aa6641 100644
--- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
@@ -103,7 +103,7 @@ class RetryFlowFileTest {
     std::shared_ptr<core::Processor> putfile_on_failure          = plan_->addProcessor("PutFile", "putfile_on_failure", {success}, false);
     std::shared_ptr<core::Processor> log_attribute               = plan_->addProcessor("LogAttribute", "log", {success}, false);
 
-    retryflowfile->setPenalizationPeriodMsec(0);
+    retryflowfile->setPenalizationPeriod(std::chrono::milliseconds{0});
 
     plan_->addConnection(generate, success, update);
     plan_->addConnection(update, success, retryflowfile);
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 157fd03..b4b8f6b 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -151,7 +151,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
       core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
   REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
   REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
-  REQUIRE(30 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriodMsec());
+  REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
   REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
   REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
 
@@ -452,7 +452,7 @@ NiFi Properties Overrides: {}
   REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
   REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
   REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
-  REQUIRE(30 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriodMsec());
+  REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
   REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
   REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
 
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 2c988dc..e3e50ac 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -167,7 +167,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   void yield() override {}
 
   bool isWorkAvailable() override {
-    return queue_.canBePopped();
+    return queue_.isWorkAvailable();
   }
 
   bool isRunning() override {
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index f9b6e5e..cac2052 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -74,9 +74,8 @@ class Connectable : public CoreComponent {
   // Check whether the relationship is auto terminated
   bool isAutoTerminated(const Relationship &relationship);
 
-  // Get Processor penalization period in MilliSecond
-  uint64_t getPenalizationPeriodMsec(void) const {
-    return (_penalizationPeriodMsec);
+  std::chrono::milliseconds getPenalizationPeriod() const {
+    return penalization_period_;
   }
 
   /**
@@ -162,7 +161,7 @@ class Connectable : public CoreComponent {
   // must hold the relationship_mutex_ before calling this
   std::shared_ptr<Connectable> getNextIncomingConnectionImpl(const std::lock_guard<std::mutex>& relationship_mutex_lock);
   // Penalization Period in MilliSecond
-  std::atomic<uint64_t> _penalizationPeriodMsec;
+  std::atomic<std::chrono::milliseconds> penalization_period_;
 
   uint8_t max_concurrent_tasks_;
 
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index ca643ad..7b97fc7 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -205,16 +205,13 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
     offset_ = offset;
   }
 
-  /**
-   * Sets the penalty expiration
-   * @param penaltyExp new penalty expiration
-   */
-  void setPenaltyExpiration(const uint64_t penaltyExp) {
-    penaltyExpiration_ms_ = penaltyExp;
+  template<typename Rep, typename Period>
+  void penalize(std::chrono::duration<Rep, Period> duration) {
+    to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
   }
 
-  uint64_t getPenaltyExpiration() const {
-    return penaltyExpiration_ms_;
+  std::chrono::time_point<std::chrono::steady_clock> getPenaltyExpiration() const {
+    return to_be_processed_after_;
   }
 
   /**
@@ -223,9 +220,8 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
    */
   uint64_t getOffset() const;
 
-  // Check whether it is still being penalized
   bool isPenalized() const {
-    return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
+    return to_be_processed_after_ > std::chrono::steady_clock::now();
   }
 
   uint64_t getId() const {
@@ -271,7 +267,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
   // Offset to the content
   uint64_t offset_;
   // Penalty expiration
-  uint64_t penaltyExpiration_ms_;
+  std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after_;
   // Attributes key/values pairs for the flow record
   AttributeMap attributes_;
   // Pointer to the associated content resource claim
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 3059b12..51851a5 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -137,9 +137,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   uint64_t getYieldPeriodMsec() const {
     return (yield_period_msec_);
   }
-  // Set Processor penalization period in MilliSecond
-  void setPenalizationPeriodMsec(uint64_t period) {
-    _penalizationPeriodMsec = period;
+
+  void setPenalizationPeriod(std::chrono::milliseconds period) {
+    penalization_period_ = period;
   }
 
   // Set Processor Maximum Concurrent Tasks
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index 35a7aec..927f9f3 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -35,10 +35,8 @@ namespace core {
 #define DEFAULT_SCHEDULING_PERIOD_MILLIS 1000
 #define DEFAULT_RUN_DURATION 0
 #define DEFAULT_MAX_CONCURRENT_TASKS 1
-#define DEFAULT_PENALIZATION_PERIOD 1
-// Default yield period in second
 #define DEFAULT_YIELD_PERIOD_SECONDS 1
-#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
 
 struct ProcessorConfig {
   std::string id;
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 786456c..6a35232 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -217,9 +217,8 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
     processor_->setUUID(uuid);
   }
 
-// Get Processor penalization period in MilliSecond
-  uint64_t getPenalizationPeriodMsec(void) {
-    return processor_->getPenalizationPeriodMsec();
+  std::chrono::milliseconds getPenalizationPeriod() {
+    return processor_->getPenalizationPeriod();
   }
 
   /**
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index ec86567..3ba4ab9 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -85,7 +85,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode penalizationPeriod;
     penalizationPeriod.name = "penalizationPeriodMillis";
-    penalizationPeriod.value = DEFAULT_PENALIZATION_PERIOD_SECONDS*1000;
+    penalizationPeriod.value = std::chrono::milliseconds{core::DEFAULT_PENALIZATION_PERIOD}.count();
 
     schedulingDefaults.children.push_back(penalizationPeriod);
 
diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h
index 4ec01b2..a6d343f 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -33,22 +33,18 @@ class FlowFileQueue {
   using value_type = std::shared_ptr<core::FlowFile>;
 
   value_type pop();
-  value_type forcePop();
   void push(const value_type& element);
   void push(value_type&& element);
-  bool canBePopped() const;
+  bool isWorkAvailable() const;
   bool empty() const;
   size_t size() const;
 
  private:
-  bool existsFlowFileWithExpiredPenalty() const;
-
   struct FlowFilePenaltyExpirationComparator {
     bool operator()(const value_type& left, const value_type& right);
   };
 
-  std::queue<value_type> fifo_queue_;
-  std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> priority_queue_;
+  std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> queue_;
 };
 
 }  // namespace utils
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index d237896..af119fb 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -181,7 +181,7 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
 std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  while (queue_.canBePopped()) {
+  while (queue_.isWorkAvailable()) {
     std::shared_ptr<core::FlowFile> item = queue_.pop();
     queued_data_size_ -= item->getSize();
 
@@ -212,7 +212,7 @@ void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);
 
   while (!queue_.empty()) {
-    std::shared_ptr<core::FlowFile> item = queue_.forcePop();
+    std::shared_ptr<core::FlowFile> item = queue_.pop();
     logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
       if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 4492101..2476b25 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -42,7 +42,7 @@ FlowFile::FlowFile()
       stored(false),
       offset_(0),
       last_queue_date_(0),
-      penaltyExpiration_ms_(0),
+      to_be_processed_after_(std::chrono::steady_clock::now()),
       event_time_(0),
       claim_(nullptr),
       marked_delete_(false) {
@@ -61,7 +61,7 @@ FlowFile& FlowFile::operator=(const FlowFile& other) {
   lineage_Identifiers_ = other.lineage_Identifiers_;
   last_queue_date_ = other.last_queue_date_;
   size_ = other.size_;
-  penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
+  to_be_processed_after_ = other.to_be_processed_after_;
   attributes_ = other.attributes_;
   claim_ = other.claim_;
   connection_ = other.connection_;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 83ee2aa..f8e8c31 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -198,9 +198,9 @@ void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow
 }
 
 void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
-  uint64_t penalization_period = process_context_->getProcessorNode()->getPenalizationPeriodMsec();
-  logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period << "ms at " << process_context_->getProcessorNode()->getName();
-  flow->setPenaltyExpiration(utils::timeutils::getTimeMillis() + penalization_period);
+  const std::chrono::milliseconds penalization_period = process_context_->getProcessorNode()->getPenalizationPeriod();
+  logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period.count() << "ms at " << process_context_->getProcessorNode()->getName();
+  flow->penalize(penalization_period);
 }
 
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 56eb82a..dd3b715 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -61,7 +61,7 @@ Processor::Processor(const std::string& name)
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
   run_duration_nano_ = DEFAULT_RUN_DURATION;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
-  _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+  penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
   max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
   active_tasks_ = 0;
   yield_expiration_ = 0;
@@ -82,7 +82,7 @@ Processor::Processor(const std::string& name, const utils::Identifier &uuid)
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
   run_duration_nano_ = DEFAULT_RUN_DURATION;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
-  _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+  penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
   max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
   active_tasks_ = 0;
   yield_expiration_ = 0;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 111dd8c..740af80 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -248,7 +248,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
 
         if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
           logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
-          processor->setPenalizationPeriodMsec(penalizationPeriod);
+          processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
         }
 
         if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
diff --git a/libminifi/src/utils/FlowFileQueue.cpp b/libminifi/src/utils/FlowFileQueue.cpp
index 8fe9e78..84b682d 100644
--- a/libminifi/src/utils/FlowFileQueue.cpp
+++ b/libminifi/src/utils/FlowFileQueue.cpp
@@ -30,70 +30,41 @@ bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_
 }
 
 FlowFileQueue::value_type FlowFileQueue::pop() {
-  if (existsFlowFileWithExpiredPenalty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
+  if (empty()) {
+    throw std::logic_error{"pop() called on an empty FlowFileQueue"};
   }
 
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
-}
-
-/**
- * Pops any flow file off the queue, whether it has an unexpired penalty or not.
- */
-FlowFileQueue::value_type FlowFileQueue::forcePop() {
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  if (!priority_queue_.empty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+  value_type next_flow_file = queue_.top();
+  queue_.pop();
+  return next_flow_file;
 }
 
 void FlowFileQueue::push(const value_type& element) {
-  if (element->isPenalized()) {
-    priority_queue_.push(element);
-  } else {
-    fifo_queue_.push(element);
+  if (!element->isPenalized()) {
+    element->penalize(std::chrono::milliseconds{0});
   }
+
+  queue_.push(element);
 }
 
 void FlowFileQueue::push(value_type&& element) {
-  if (element->isPenalized()) {
-    priority_queue_.push(std::move(element));
-  } else {
-    fifo_queue_.push(std::move(element));
+  if (!element->isPenalized()) {
+    element->penalize(std::chrono::milliseconds{0});
   }
+
+  queue_.push(std::move(element));
 }
 
-bool FlowFileQueue::canBePopped() const {
-  return !fifo_queue_.empty() || existsFlowFileWithExpiredPenalty();
+bool FlowFileQueue::isWorkAvailable() const {
+  return !queue_.empty() && !queue_.top()->isPenalized();
 }
 
 bool FlowFileQueue::empty() const {
-  return fifo_queue_.empty() && priority_queue_.empty();
+  return queue_.empty();
 }
 
 size_t FlowFileQueue::size() const {
-  return fifo_queue_.size() + priority_queue_.size();
-}
-
-bool FlowFileQueue::existsFlowFileWithExpiredPenalty() const {
-  return !priority_queue_.empty() && !priority_queue_.top()->isPenalized();
+  return queue_.size();
 }
 
 }  // namespace utils
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
index 53e7ea8..301a821 100644
--- a/libminifi/test/unit/ConnectionTests.cpp
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -53,9 +53,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
     SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
 
     const auto flow_file = std::make_shared<core::FlowFile>();
-    const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
-    const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
-    flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+    flow_file->penalize(std::chrono::seconds{10});
     connection->put(flow_file);
     REQUIRE(nullptr == connection->poll(expired_flow_files));
   }
@@ -74,9 +72,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
     SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
 
     const auto penalized_flow_file = std::make_shared<core::FlowFile>();
-    const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
-    const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
-    penalized_flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+    penalized_flow_file->penalize(std::chrono::seconds{10});
     connection->put(penalized_flow_file);
 
     const auto flow_file = std::make_shared<core::FlowFile>();
diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp b/libminifi/test/unit/FlowFileQueueTests.cpp
index d64169d..0ce3ffb 100644
--- a/libminifi/test/unit/FlowFileQueueTests.cpp
+++ b/libminifi/test/unit/FlowFileQueueTests.cpp
@@ -24,9 +24,8 @@ TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") {
 
   REQUIRE(queue.empty());
   REQUIRE(queue.size() == 0);
-  REQUIRE_FALSE(queue.canBePopped());
+  REQUIRE_FALSE(queue.isWorkAvailable());
   REQUIRE_THROWS(queue.pop());
-  REQUIRE_THROWS(queue.forcePop());
 }
 
 TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can pop it", "[FlowFileQueue][pop]") {
@@ -36,7 +35,7 @@ TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can po
 
   REQUIRE_FALSE(queue.empty());
   REQUIRE(queue.size() == 1);
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file);
 }
 
@@ -44,7 +43,7 @@ TEST_CASE("A flow file can be moved into the FlowFileQueue", "[FlowFileQueue][po
   utils::FlowFileQueue queue;
 
   auto penalized_flow_file = std::make_shared<core::FlowFile>();
-  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 100);
+  penalized_flow_file->penalize(std::chrono::milliseconds{100});
   queue.push(std::move(penalized_flow_file));
 
   queue.push(std::make_shared<core::FlowFile>());
@@ -62,13 +61,13 @@ TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them i
   const auto flow_file_3 = std::make_shared<core::FlowFile>();
   queue.push(flow_file_3);
 
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_1);
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_2);
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_3);
-  REQUIRE_FALSE(queue.canBePopped());
+  REQUIRE_FALSE(queue.isWorkAvailable());
 }
 
 namespace {
@@ -86,76 +85,75 @@ class PenaltyHasExpired {
 
 TEST_CASE("Penalized flow files are popped from the FlowFileQueue in the order their penalties expire", "[FlowFileQueue][pop]") {
   utils::FlowFileQueue queue;
-  const auto now = utils::timeutils::getTimeMillis();
   const auto flow_file_1 = std::make_shared<core::FlowFile>();
-  flow_file_1->setPenaltyExpiration(now + 70);
+  flow_file_1->penalize(std::chrono::milliseconds{70});
   queue.push(flow_file_1);
   const auto flow_file_2 = std::make_shared<core::FlowFile>();
-  flow_file_2->setPenaltyExpiration(now + 50);
+  flow_file_2->penalize(std::chrono::milliseconds{50});
   queue.push(flow_file_2);
   const auto flow_file_3 = std::make_shared<core::FlowFile>();
-  flow_file_3->setPenaltyExpiration(now + 80);
+  flow_file_3->penalize(std::chrono::milliseconds{80});
   queue.push(flow_file_3);
   const auto flow_file_4 = std::make_shared<core::FlowFile>();
-  flow_file_4->setPenaltyExpiration(now + 60);
+  flow_file_4->penalize(std::chrono::milliseconds{60});
   queue.push(flow_file_4);
 
-  REQUIRE_FALSE(queue.canBePopped());
+  REQUIRE_FALSE(queue.isWorkAvailable());
 
   REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_2}, std::chrono::milliseconds{10}));
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_2);
 
   REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_4}, std::chrono::milliseconds{10}));
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_4);
 
   REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_1}, std::chrono::milliseconds{10}));
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_1);
 
   REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_3}, std::chrono::milliseconds{10}));
-  REQUIRE(queue.canBePopped());
+  REQUIRE(queue.isWorkAvailable());
   REQUIRE(queue.pop() == flow_file_3);
 
-  REQUIRE_FALSE(queue.canBePopped());
+  REQUIRE_FALSE(queue.isWorkAvailable());
 }
 
 TEST_CASE("If a penalized then a non-penalized flow file is added to the FlowFileQueue, pop() returns the correct one", "[FlowFileQueue][pop]") {
   utils::FlowFileQueue queue;
   const auto penalized_flow_file = std::make_shared<core::FlowFile>();
-  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+  penalized_flow_file->penalize(std::chrono::milliseconds{10});
   queue.push(penalized_flow_file);
   const auto flow_file = std::make_shared<core::FlowFile>();
   queue.push(flow_file);
 
   SECTION("Try popping right away") {
-    REQUIRE(queue.canBePopped());
+    REQUIRE(queue.isWorkAvailable());
     REQUIRE(queue.pop() == flow_file);
-    REQUIRE_FALSE(queue.canBePopped());
+    REQUIRE_FALSE(queue.isWorkAvailable());
   }
 
   SECTION("Wait until the penalty expires, then pop") {
     REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{penalized_flow_file}, std::chrono::milliseconds{10}));
 
-    REQUIRE(queue.canBePopped());
-    REQUIRE(queue.pop() == penalized_flow_file);
-    REQUIRE(queue.canBePopped());
+    REQUIRE(queue.isWorkAvailable());
     REQUIRE(queue.pop() == flow_file);
-    REQUIRE_FALSE(queue.canBePopped());
+    REQUIRE(queue.isWorkAvailable());
+    REQUIRE(queue.pop() == penalized_flow_file);
+    REQUIRE_FALSE(queue.isWorkAvailable());
   }
 }
 
 TEST_CASE("Force pop on FlowFileQueue returns the flow files, whether penalized or not", "[FlowFileQueue][forcePop]") {
   utils::FlowFileQueue queue;
   const auto penalized_flow_file = std::make_shared<core::FlowFile>();
-  penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+  penalized_flow_file->penalize(std::chrono::milliseconds{10});
   queue.push(penalized_flow_file);
   const auto flow_file = std::make_shared<core::FlowFile>();
   queue.push(flow_file);
 
   REQUIRE_FALSE(queue.empty());
-  REQUIRE(queue.forcePop() == flow_file);
-  REQUIRE(queue.forcePop() == penalized_flow_file);
+  REQUIRE(queue.pop() == flow_file);
+  REQUIRE(queue.pop() == penalized_flow_file);
   REQUIRE(queue.empty());
 }