You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2021/03/16 09:18:37 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1487 Use a single
priority queue in FlowFileQueue
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fc1a9dd MINIFICPP-1487 Use a single priority queue in FlowFileQueue
fc1a9dd is described below
commit fc1a9ddf8c91686eb98d56569dcdc0e1b772451f
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Tue Mar 16 10:14:46 2021 +0100
MINIFICPP-1487 Use a single priority queue in FlowFileQueue
This closes #1027
Co-authored-by: Márton Szász <sz...@gmail.com>
Signed-off-by: Adam Debreceni <ad...@apache.org>
---
.../tests/unit/ProcessorTests.cpp | 124 ++++++++++++++++-----
.../tests/unit/RetryFlowFileTests.cpp | 2 +-
.../tests/unit/YamlConfigurationTests.cpp | 4 +-
libminifi/include/Connection.h | 2 +-
libminifi/include/core/Connectable.h | 7 +-
libminifi/include/core/FlowFile.h | 18 ++-
libminifi/include/core/Processor.h | 6 +-
libminifi/include/core/ProcessorConfig.h | 4 +-
libminifi/include/core/ProcessorNode.h | 5 +-
.../include/core/state/nodes/SchedulingNodes.h | 2 +-
libminifi/include/utils/FlowFileQueue.h | 8 +-
libminifi/src/Connection.cpp | 4 +-
libminifi/src/core/FlowFile.cpp | 4 +-
libminifi/src/core/ProcessSession.cpp | 6 +-
libminifi/src/core/Processor.cpp | 4 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 2 +-
libminifi/src/utils/FlowFileQueue.cpp | 63 +++--------
libminifi/test/unit/ConnectionTests.cpp | 8 +-
libminifi/test/unit/FlowFileQueueTests.cpp | 56 +++++-----
19 files changed, 174 insertions(+), 155 deletions(-)
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 1ebcd39..7ca8000 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -615,65 +615,131 @@ TEST_CASE("TestRPGValid", "[TestRPG6]") {
testRPGBypass("", "8080", "8080", false);
}
-TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+namespace {
+
+class ProcessorWithIncomingConnectionTest {
+ public:
+ ProcessorWithIncomingConnectionTest();
+ ~ProcessorWithIncomingConnectionTest();
+
+ protected:
+ std::shared_ptr<core::Processor> processor_;
+ std::shared_ptr<minifi::Connection> incoming_connection_;
+ std::shared_ptr<core::ProcessSession> session_;
+};
+
+ProcessorWithIncomingConnectionTest::ProcessorWithIncomingConnectionTest() {
LogTestController::getInstance().setDebug<core::Processor>();
const auto repo = std::make_shared<TestRepository>();
const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
- const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
- const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
- incoming_connection->addRelationship(core::Relationship{"success", ""});
- incoming_connection->setDestinationUUID(processor->getUUID());
- processor->addConnection(incoming_connection);
- processor->initialize();
+ processor_ = std::make_shared<processors::LogAttribute>("test_processor");
+ incoming_connection_ = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+ incoming_connection_->addRelationship(core::Relationship{"success", ""});
+ incoming_connection_->setDestinationUUID(processor_->getUUID());
+ processor_->addConnection(incoming_connection_);
+ processor_->initialize();
- const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+ const auto processor_node = std::make_shared<core::ProcessorNode>(processor_);
const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
- const auto session = session_factory->createSession();
+ session_ = session_factory->createSession();
+}
+ProcessorWithIncomingConnectionTest::~ProcessorWithIncomingConnectionTest() {
+ LogTestController::getInstance().reset();
+}
+
+} // namespace
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
SECTION("Initially, the queue is empty, so there is no work available") {
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
}
SECTION("When a non-penalized flow file is queued, there is work available") {
- const auto flow_file = session->create();
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ incoming_connection_->put(flow_file);
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
- const auto flow_file = session->create();
- session->penalize(flow_file);
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ session_->penalize(flow_file);
+ incoming_connection_->put(flow_file);
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
}
SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
- const auto normal_flow_file = session->create();
- incoming_connection->put(normal_flow_file);
+ const auto normal_flow_file = session_->create();
+ incoming_connection_->put(normal_flow_file);
- const auto penalized_flow_file = session->create();
- session->penalize(penalized_flow_file);
- incoming_connection->put(penalized_flow_file);
+ const auto penalized_flow_file = session_->create();
+ session_->penalize(penalized_flow_file);
+ incoming_connection_->put(penalized_flow_file);
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
- processor->setPenalizationPeriodMsec(10);
+ processor_->setPenalizationPeriod(std::chrono::milliseconds{10});
- const auto flow_file = session->create();
- session->penalize(flow_file);
- incoming_connection->put(flow_file);
+ const auto flow_file = session_->create();
+ session_->penalize(flow_file);
+ incoming_connection_->put(flow_file);
- REQUIRE_FALSE(processor->isWorkAvailable());
+ REQUIRE_FALSE(processor_->isWorkAvailable());
const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
- REQUIRE(processor->isWorkAvailable());
+ REQUIRE(processor_->isWorkAvailable());
}
}
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A failed and re-penalized flow file does not block the incoming queue of the Processor", "[penalize]") {
+ processor_->setPenalizationPeriod(std::chrono::milliseconds{100});
+ const auto penalized_flow_file = session_->create();
+ session_->penalize(penalized_flow_file);
+ incoming_connection_->put(penalized_flow_file);
+ const auto penalty_has_expired = [penalized_flow_file] { return !penalized_flow_file->isPenalized(); };
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+
+ const auto flow_file_1 = session_->create();
+ incoming_connection_->put(flow_file_1);
+ const auto flow_file_2 = session_->create();
+ incoming_connection_->put(flow_file_2);
+ const auto flow_file_3 = session_->create();
+ incoming_connection_->put(flow_file_3);
+
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+ const auto next_flow_file_1 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_1 == penalized_flow_file);
+
+ session_->penalize(penalized_flow_file);
+ incoming_connection_->put(penalized_flow_file);
+ std::this_thread::sleep_for(std::chrono::milliseconds{110});
+
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ const auto next_flow_file_2 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_2 != penalized_flow_file);
+ REQUIRE(next_flow_file_2 == flow_file_1);
+
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ const auto next_flow_file_3 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_3 != penalized_flow_file);
+ REQUIRE(next_flow_file_3 == flow_file_2);
+
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ const auto next_flow_file_4 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_4 != penalized_flow_file);
+ REQUIRE(next_flow_file_4 == flow_file_3);
+
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+ REQUIRE(incoming_connection_->isWorkAvailable());
+ const auto next_flow_file_5 = incoming_connection_->poll(expired_flow_files);
+ REQUIRE(next_flow_file_5 == penalized_flow_file);
+}
diff --git a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
index e367ec8..8aa6641 100644
--- a/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/RetryFlowFileTests.cpp
@@ -103,7 +103,7 @@ class RetryFlowFileTest {
std::shared_ptr<core::Processor> putfile_on_failure = plan_->addProcessor("PutFile", "putfile_on_failure", {success}, false);
std::shared_ptr<core::Processor> log_attribute = plan_->addProcessor("LogAttribute", "log", {success}, false);
- retryflowfile->setPenalizationPeriodMsec(0);
+ retryflowfile->setPenalizationPeriod(std::chrono::milliseconds{0});
plan_->addConnection(generate, success, update);
plan_->addConnection(update, success, retryflowfile);
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 157fd03..b4b8f6b 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -151,7 +151,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
- REQUIRE(30 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriodMsec());
+ REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
@@ -452,7 +452,7 @@ NiFi Properties Overrides: {}
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
- REQUIRE(30 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriodMsec());
+ REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 2c988dc..e3e50ac 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -167,7 +167,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
void yield() override {}
bool isWorkAvailable() override {
- return queue_.canBePopped();
+ return queue_.isWorkAvailable();
}
bool isRunning() override {
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index f9b6e5e..cac2052 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -74,9 +74,8 @@ class Connectable : public CoreComponent {
// Check whether the relationship is auto terminated
bool isAutoTerminated(const Relationship &relationship);
- // Get Processor penalization period in MilliSecond
- uint64_t getPenalizationPeriodMsec(void) const {
- return (_penalizationPeriodMsec);
+ std::chrono::milliseconds getPenalizationPeriod() const {
+ return penalization_period_;
}
/**
@@ -162,7 +161,7 @@ class Connectable : public CoreComponent {
// must hold the relationship_mutex_ before calling this
std::shared_ptr<Connectable> getNextIncomingConnectionImpl(const std::lock_guard<std::mutex>& relationship_mutex_lock);
// Penalization Period in MilliSecond
- std::atomic<uint64_t> _penalizationPeriodMsec;
+ std::atomic<std::chrono::milliseconds> penalization_period_;
uint8_t max_concurrent_tasks_;
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index ca643ad..7b97fc7 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -205,16 +205,13 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
offset_ = offset;
}
- /**
- * Sets the penalty expiration
- * @param penaltyExp new penalty expiration
- */
- void setPenaltyExpiration(const uint64_t penaltyExp) {
- penaltyExpiration_ms_ = penaltyExp;
+ template<typename Rep, typename Period>
+ void penalize(std::chrono::duration<Rep, Period> duration) {
+ to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
}
- uint64_t getPenaltyExpiration() const {
- return penaltyExpiration_ms_;
+ std::chrono::time_point<std::chrono::steady_clock> getPenaltyExpiration() const {
+ return to_be_processed_after_;
}
/**
@@ -223,9 +220,8 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
*/
uint64_t getOffset() const;
- // Check whether it is still being penalized
bool isPenalized() const {
- return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
+ return to_be_processed_after_ > std::chrono::steady_clock::now();
}
uint64_t getId() const {
@@ -271,7 +267,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
// Offset to the content
uint64_t offset_;
// Penalty expiration
- uint64_t penaltyExpiration_ms_;
+ std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after_;
// Attributes key/values pairs for the flow record
AttributeMap attributes_;
// Pointer to the associated content resource claim
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 3059b12..51851a5 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -137,9 +137,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
uint64_t getYieldPeriodMsec() const {
return (yield_period_msec_);
}
- // Set Processor penalization period in MilliSecond
- void setPenalizationPeriodMsec(uint64_t period) {
- _penalizationPeriodMsec = period;
+
+ void setPenalizationPeriod(std::chrono::milliseconds period) {
+ penalization_period_ = period;
}
// Set Processor Maximum Concurrent Tasks
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index 35a7aec..927f9f3 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -35,10 +35,8 @@ namespace core {
#define DEFAULT_SCHEDULING_PERIOD_MILLIS 1000
#define DEFAULT_RUN_DURATION 0
#define DEFAULT_MAX_CONCURRENT_TASKS 1
-#define DEFAULT_PENALIZATION_PERIOD 1
-// Default yield period in second
#define DEFAULT_YIELD_PERIOD_SECONDS 1
-#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
struct ProcessorConfig {
std::string id;
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 786456c..6a35232 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -217,9 +217,8 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
processor_->setUUID(uuid);
}
-// Get Processor penalization period in MilliSecond
- uint64_t getPenalizationPeriodMsec(void) {
- return processor_->getPenalizationPeriodMsec();
+ std::chrono::milliseconds getPenalizationPeriod() {
+ return processor_->getPenalizationPeriod();
}
/**
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index ec86567..3ba4ab9 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -85,7 +85,7 @@ class SchedulingDefaults : public DeviceInformation {
SerializedResponseNode penalizationPeriod;
penalizationPeriod.name = "penalizationPeriodMillis";
- penalizationPeriod.value = DEFAULT_PENALIZATION_PERIOD_SECONDS*1000;
+ penalizationPeriod.value = std::chrono::milliseconds{core::DEFAULT_PENALIZATION_PERIOD}.count();
schedulingDefaults.children.push_back(penalizationPeriod);
diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h
index 4ec01b2..a6d343f 100644
--- a/libminifi/include/utils/FlowFileQueue.h
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -33,22 +33,18 @@ class FlowFileQueue {
using value_type = std::shared_ptr<core::FlowFile>;
value_type pop();
- value_type forcePop();
void push(const value_type& element);
void push(value_type&& element);
- bool canBePopped() const;
+ bool isWorkAvailable() const;
bool empty() const;
size_t size() const;
private:
- bool existsFlowFileWithExpiredPenalty() const;
-
struct FlowFilePenaltyExpirationComparator {
bool operator()(const value_type& left, const value_type& right);
};
- std::queue<value_type> fifo_queue_;
- std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> priority_queue_;
+ std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> queue_;
};
} // namespace utils
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index d237896..af119fb 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -181,7 +181,7 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
std::lock_guard<std::mutex> lock(mutex_);
- while (queue_.canBePopped()) {
+ while (queue_.isWorkAvailable()) {
std::shared_ptr<core::FlowFile> item = queue_.pop();
queued_data_size_ -= item->getSize();
@@ -212,7 +212,7 @@ void Connection::drain(bool delete_permanently) {
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty()) {
- std::shared_ptr<core::FlowFile> item = queue_.forcePop();
+ std::shared_ptr<core::FlowFile> item = queue_.pop();
logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
if (delete_permanently) {
if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 4492101..2476b25 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -42,7 +42,7 @@ FlowFile::FlowFile()
stored(false),
offset_(0),
last_queue_date_(0),
- penaltyExpiration_ms_(0),
+ to_be_processed_after_(std::chrono::steady_clock::now()),
event_time_(0),
claim_(nullptr),
marked_delete_(false) {
@@ -61,7 +61,7 @@ FlowFile& FlowFile::operator=(const FlowFile& other) {
lineage_Identifiers_ = other.lineage_Identifiers_;
last_queue_date_ = other.last_queue_date_;
size_ = other.size_;
- penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
+ to_be_processed_after_ = other.to_be_processed_after_;
attributes_ = other.attributes_;
claim_ = other.claim_;
connection_ = other.connection_;
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 83ee2aa..f8e8c31 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -198,9 +198,9 @@ void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow
}
void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
- uint64_t penalization_period = process_context_->getProcessorNode()->getPenalizationPeriodMsec();
- logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period << "ms at " << process_context_->getProcessorNode()->getName();
- flow->setPenaltyExpiration(utils::timeutils::getTimeMillis() + penalization_period);
+ const std::chrono::milliseconds penalization_period = process_context_->getProcessorNode()->getPenalizationPeriod();
+ logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period.count() << "ms at " << process_context_->getProcessorNode()->getName();
+ flow->penalize(penalization_period);
}
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 56eb82a..dd3b715 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -61,7 +61,7 @@ Processor::Processor(const std::string& name)
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
- _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+ penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
yield_expiration_ = 0;
@@ -82,7 +82,7 @@ Processor::Processor(const std::string& name, const utils::Identifier &uuid)
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
- _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+ penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
yield_expiration_ = 0;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 111dd8c..740af80 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -248,7 +248,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
- processor->setPenalizationPeriodMsec(penalizationPeriod);
+ processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
}
if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
diff --git a/libminifi/src/utils/FlowFileQueue.cpp b/libminifi/src/utils/FlowFileQueue.cpp
index 8fe9e78..84b682d 100644
--- a/libminifi/src/utils/FlowFileQueue.cpp
+++ b/libminifi/src/utils/FlowFileQueue.cpp
@@ -30,70 +30,41 @@ bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_
}
FlowFileQueue::value_type FlowFileQueue::pop() {
- if (existsFlowFileWithExpiredPenalty()) {
- value_type next_flow_file = priority_queue_.top();
- priority_queue_.pop();
- return next_flow_file;
+ if (empty()) {
+ throw std::logic_error{"pop() called on an empty FlowFileQueue"};
}
- if (!fifo_queue_.empty()) {
- value_type next_flow_file = fifo_queue_.front();
- fifo_queue_.pop();
- return next_flow_file;
- }
-
- throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
-}
-
-/**
- * Pops any flow file off the queue, whether it has an unexpired penalty or not.
- */
-FlowFileQueue::value_type FlowFileQueue::forcePop() {
- if (!fifo_queue_.empty()) {
- value_type next_flow_file = fifo_queue_.front();
- fifo_queue_.pop();
- return next_flow_file;
- }
-
- if (!priority_queue_.empty()) {
- value_type next_flow_file = priority_queue_.top();
- priority_queue_.pop();
- return next_flow_file;
- }
-
- throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+ value_type next_flow_file = queue_.top();
+ queue_.pop();
+ return next_flow_file;
}
void FlowFileQueue::push(const value_type& element) {
- if (element->isPenalized()) {
- priority_queue_.push(element);
- } else {
- fifo_queue_.push(element);
+ if (!element->isPenalized()) {
+ element->penalize(std::chrono::milliseconds{0});
}
+
+ queue_.push(element);
}
void FlowFileQueue::push(value_type&& element) {
- if (element->isPenalized()) {
- priority_queue_.push(std::move(element));
- } else {
- fifo_queue_.push(std::move(element));
+ if (!element->isPenalized()) {
+ element->penalize(std::chrono::milliseconds{0});
}
+
+ queue_.push(std::move(element));
}
-bool FlowFileQueue::canBePopped() const {
- return !fifo_queue_.empty() || existsFlowFileWithExpiredPenalty();
+bool FlowFileQueue::isWorkAvailable() const {
+ return !queue_.empty() && !queue_.top()->isPenalized();
}
bool FlowFileQueue::empty() const {
- return fifo_queue_.empty() && priority_queue_.empty();
+ return queue_.empty();
}
size_t FlowFileQueue::size() const {
- return fifo_queue_.size() + priority_queue_.size();
-}
-
-bool FlowFileQueue::existsFlowFileWithExpiredPenalty() const {
- return !priority_queue_.empty() && !priority_queue_.top()->isPenalized();
+ return queue_.size();
}
} // namespace utils
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
index 53e7ea8..301a821 100644
--- a/libminifi/test/unit/ConnectionTests.cpp
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -53,9 +53,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
const auto flow_file = std::make_shared<core::FlowFile>();
- const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
- const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
- flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+ flow_file->penalize(std::chrono::seconds{10});
connection->put(flow_file);
REQUIRE(nullptr == connection->poll(expired_flow_files));
}
@@ -74,9 +72,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
const auto penalized_flow_file = std::make_shared<core::FlowFile>();
- const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
- const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
- penalized_flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+ penalized_flow_file->penalize(std::chrono::seconds{10});
connection->put(penalized_flow_file);
const auto flow_file = std::make_shared<core::FlowFile>();
diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp b/libminifi/test/unit/FlowFileQueueTests.cpp
index d64169d..0ce3ffb 100644
--- a/libminifi/test/unit/FlowFileQueueTests.cpp
+++ b/libminifi/test/unit/FlowFileQueueTests.cpp
@@ -24,9 +24,8 @@ TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") {
REQUIRE(queue.empty());
REQUIRE(queue.size() == 0);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_FALSE(queue.isWorkAvailable());
REQUIRE_THROWS(queue.pop());
- REQUIRE_THROWS(queue.forcePop());
}
TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can pop it", "[FlowFileQueue][pop]") {
@@ -36,7 +35,7 @@ TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can po
REQUIRE_FALSE(queue.empty());
REQUIRE(queue.size() == 1);
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
}
@@ -44,7 +43,7 @@ TEST_CASE("A flow file can be moved into the FlowFileQueue", "[FlowFileQueue][po
utils::FlowFileQueue queue;
auto penalized_flow_file = std::make_shared<core::FlowFile>();
- penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 100);
+ penalized_flow_file->penalize(std::chrono::milliseconds{100});
queue.push(std::move(penalized_flow_file));
queue.push(std::make_shared<core::FlowFile>());
@@ -62,13 +61,13 @@ TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them i
const auto flow_file_3 = std::make_shared<core::FlowFile>();
queue.push(flow_file_3);
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_1);
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_2);
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_3);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_FALSE(queue.isWorkAvailable());
}
namespace {
@@ -86,76 +85,75 @@ class PenaltyHasExpired {
TEST_CASE("Penalized flow files are popped from the FlowFileQueue in the order their penalties expire", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
- const auto now = utils::timeutils::getTimeMillis();
const auto flow_file_1 = std::make_shared<core::FlowFile>();
- flow_file_1->setPenaltyExpiration(now + 70);
+ flow_file_1->penalize(std::chrono::milliseconds{70});
queue.push(flow_file_1);
const auto flow_file_2 = std::make_shared<core::FlowFile>();
- flow_file_2->setPenaltyExpiration(now + 50);
+ flow_file_2->penalize(std::chrono::milliseconds{50});
queue.push(flow_file_2);
const auto flow_file_3 = std::make_shared<core::FlowFile>();
- flow_file_3->setPenaltyExpiration(now + 80);
+ flow_file_3->penalize(std::chrono::milliseconds{80});
queue.push(flow_file_3);
const auto flow_file_4 = std::make_shared<core::FlowFile>();
- flow_file_4->setPenaltyExpiration(now + 60);
+ flow_file_4->penalize(std::chrono::milliseconds{60});
queue.push(flow_file_4);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_FALSE(queue.isWorkAvailable());
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_2}, std::chrono::milliseconds{10}));
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_2);
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_4}, std::chrono::milliseconds{10}));
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_4);
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_1}, std::chrono::milliseconds{10}));
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_1);
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_3}, std::chrono::milliseconds{10}));
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_3);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_FALSE(queue.isWorkAvailable());
}
TEST_CASE("If a penalized then a non-penalized flow file is added to the FlowFileQueue, pop() returns the correct one", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto penalized_flow_file = std::make_shared<core::FlowFile>();
- penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+ penalized_flow_file->penalize(std::chrono::milliseconds{10});
queue.push(penalized_flow_file);
const auto flow_file = std::make_shared<core::FlowFile>();
queue.push(flow_file);
SECTION("Try popping right away") {
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_FALSE(queue.isWorkAvailable());
}
SECTION("Wait until the penalty expires, then pop") {
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{penalized_flow_file}, std::chrono::milliseconds{10}));
- REQUIRE(queue.canBePopped());
- REQUIRE(queue.pop() == penalized_flow_file);
- REQUIRE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
- REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE(queue.isWorkAvailable());
+ REQUIRE(queue.pop() == penalized_flow_file);
+ REQUIRE_FALSE(queue.isWorkAvailable());
}
}
TEST_CASE("Force pop on FlowFileQueue returns the flow files, whether penalized or not", "[FlowFileQueue][forcePop]") {
utils::FlowFileQueue queue;
const auto penalized_flow_file = std::make_shared<core::FlowFile>();
- penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+ penalized_flow_file->penalize(std::chrono::milliseconds{10});
queue.push(penalized_flow_file);
const auto flow_file = std::make_shared<core::FlowFile>();
queue.push(flow_file);
REQUIRE_FALSE(queue.empty());
- REQUIRE(queue.forcePop() == flow_file);
- REQUIRE(queue.forcePop() == penalized_flow_file);
+ REQUIRE(queue.pop() == flow_file);
+ REQUIRE(queue.pop() == penalized_flow_file);
REQUIRE(queue.empty());
}