You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/03/08 14:54:27 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1487 Do not
trigger the processor if the incoming queue has penalized flow files only
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new c8c4482 MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only
c8c4482 is described below
commit c8c448240690e2398534c99b101e7d536a367cb7
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Thu Feb 18 16:28:11 2021 +0100
MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only
* make mutex_ mutable and mark const methods const
* remove C-style (void) from function declarations
* remove pointless comments
* add a missing virtual-override pair on isYield()
* add a missing 'explicit'
* remove the declaration of unimplemented method getNextIncomingConnection()
* Test that the processor detects correctly whether it has any work to do
* Document the current behavior of Connection::poll(), to be fixed later
* Use a custom queue which is aware of penalty expirations
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #1013
---
.../tests/unit/ProcessorTests.cpp | 63 ++++++++
libminifi/include/Connection.h | 6 +-
libminifi/include/core/Processor.h | 51 +++----
libminifi/include/utils/FlowFileQueue.h | 58 ++++++++
libminifi/src/Connection.cpp | 22 +--
libminifi/src/SchedulingAgent.cpp | 2 +-
libminifi/src/core/Processor.cpp | 19 +--
libminifi/src/utils/FlowFileQueue.cpp | 103 +++++++++++++
libminifi/test/unit/ConnectionTests.cpp | 88 +++++++++++
libminifi/test/unit/FlowFileQueueTests.cpp | 161 +++++++++++++++++++++
libminifi/test/unit/MockClasses.h | 2 +-
11 files changed, 500 insertions(+), 75 deletions(-)
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 5d34024..1ebcd39 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -614,3 +614,66 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
TEST_CASE("TestRPGValid", "[TestRPG6]") {
testRPGBypass("", "8080", "8080", false);
}
+
+TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+ LogTestController::getInstance().setDebug<core::Processor>();
+
+ const auto repo = std::make_shared<TestRepository>();
+ const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
+
+ const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
+ const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+ incoming_connection->addRelationship(core::Relationship{"success", ""});
+ incoming_connection->setDestinationUUID(processor->getUUID());
+ processor->addConnection(incoming_connection);
+ processor->initialize();
+
+ const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+ const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
+ const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
+ const auto session = session_factory->createSession();
+
+ SECTION("Initially, the queue is empty, so there is no work available") {
+ REQUIRE_FALSE(processor->isWorkAvailable());
+ }
+
+ SECTION("When a non-penalized flow file is queued, there is work available") {
+ const auto flow_file = session->create();
+ incoming_connection->put(flow_file);
+
+ REQUIRE(processor->isWorkAvailable());
+ }
+
+ SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
+ const auto flow_file = session->create();
+ session->penalize(flow_file);
+ incoming_connection->put(flow_file);
+
+ REQUIRE_FALSE(processor->isWorkAvailable());
+ }
+
+ SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
+ const auto normal_flow_file = session->create();
+ incoming_connection->put(normal_flow_file);
+
+ const auto penalized_flow_file = session->create();
+ session->penalize(penalized_flow_file);
+ incoming_connection->put(penalized_flow_file);
+
+ REQUIRE(processor->isWorkAvailable());
+ }
+
+ SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
+ processor->setPenalizationPeriodMsec(10);
+
+ const auto flow_file = session->create();
+ session->penalize(flow_file);
+ incoming_connection->put(flow_file);
+
+ REQUIRE_FALSE(processor->isWorkAvailable());
+ const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+ REQUIRE(processor->isWorkAvailable());
+ }
+}
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 3084d2e..2c988dc 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -24,7 +24,6 @@
#include <set>
#include <string>
#include <vector>
-#include <queue>
#include <map>
#include <mutex>
#include <atomic>
@@ -35,6 +34,7 @@
#include "core/Relationship.h"
#include "core/FlowFile.h"
#include "core/Repository.h"
+#include "utils/FlowFileQueue.h"
namespace org {
namespace apache {
@@ -167,7 +167,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
void yield() override {}
bool isWorkAvailable() override {
- return !isEmpty();
+ return queue_.canBePopped();
}
bool isRunning() override {
@@ -203,7 +203,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
// Queued data size
std::atomic<uint64_t> queued_data_size_;
// Queue for the Flow File
- std::queue<std::shared_ptr<core::FlowFile>> queue_;
+ utils::FlowFileQueue queue_;
// flow repository
// Logger
std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index b1fb982..3059b12 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -64,15 +64,8 @@ namespace core {
// Processor Class
class Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> {
public:
- // Constructor
- /*!
- * Create a new processor
- */
-
Processor(const std::string& name, const utils::Identifier &uuid);
-
- Processor(const std::string& name); // NOLINT
- // Destructor
+ explicit Processor(const std::string& name);
virtual ~Processor() {
notifyStop();
}
@@ -81,7 +74,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
// Set Processor Scheduled State
void setScheduledState(ScheduledState state);
// Get Processor Scheduled State
- ScheduledState getScheduledState(void) {
+ ScheduledState getScheduledState() const {
return state_;
}
// Set Processor Scheduling Strategy
@@ -89,7 +82,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
strategy_ = strategy;
}
// Get Processor Scheduling Strategy
- SchedulingStrategy getSchedulingStrategy(void) {
+ SchedulingStrategy getSchedulingStrategy() const {
return strategy_;
}
// Set Processor Loss Tolerant
@@ -97,7 +90,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
loss_tolerant_ = lossTolerant;
}
// Get Processor Loss Tolerant
- bool getlossTolerant(void) {
+ bool getlossTolerant() const {
return loss_tolerant_;
}
// Set Processor Scheduling Period in Nano Second
@@ -108,11 +101,10 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
scheduling_period_nano_ = period > minPeriod ? period : minPeriod;
}
// Get Processor Scheduling Period in Nano Second
- uint64_t getSchedulingPeriodNano(void) {
+ uint64_t getSchedulingPeriodNano() const {
return scheduling_period_nano_;
}
-
/**
* Sets the cron period
* @param period cron period.
@@ -129,13 +121,12 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
return cron_period_;
}
-
// Set Processor Run Duration in Nano Second
void setRunDurationNano(uint64_t period) {
run_duration_nano_ = period;
}
// Get Processor Run Duration in Nano Second
- uint64_t getRunDurationNano(void) {
+ uint64_t getRunDurationNano() const {
return (run_duration_nano_);
}
// Set Processor yield period in MilliSecond
@@ -143,7 +134,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
+ uint64_t getYieldPeriodMsec() const {
return (yield_period_msec_);
}
// Set Processor penalization period in MilliSecond
@@ -156,7 +147,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
max_concurrent_tasks_ = tasks;
}
// Get Processor Maximum Concurrent Tasks
- uint8_t getMaxConcurrentTasks(void) {
+ uint8_t getMaxConcurrentTasks() const {
return (max_concurrent_tasks_);
}
// Set Trigger when empty
@@ -164,23 +155,23 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
_triggerWhenEmpty = value;
}
// Get Trigger when empty
- bool getTriggerWhenEmpty(void) {
+ bool getTriggerWhenEmpty() const {
return (_triggerWhenEmpty);
}
// Get Active Task Counts
- uint8_t getActiveTasks(void) {
+ uint8_t getActiveTasks() const {
return (active_tasks_);
}
// Increment Active Task Counts
- void incrementActiveTasks(void) {
+ void incrementActiveTasks() {
active_tasks_++;
}
// decrement Active Task Counts
- void decrementActiveTask(void) {
+ void decrementActiveTask() {
if (active_tasks_ > 0)
active_tasks_--;
}
- void clearActiveTask(void) {
+ void clearActiveTask() {
active_tasks_ = 0;
}
// Yield based on the yield period
@@ -192,7 +183,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
}
// whether need be to yield
- bool isYield() {
+ virtual bool isYield() {
if (yield_expiration_ > 0)
return (yield_expiration_ >= utils::timeutils::getTimeMillis());
else
@@ -203,28 +194,20 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
yield_expiration_ = 0;
}
// get yield time
- uint64_t getYieldTime() {
+ uint64_t getYieldTime() const {
uint64_t curTime = utils::timeutils::getTimeMillis();
if (yield_expiration_ > curTime)
return (yield_expiration_ - curTime);
else
return 0;
}
- // Whether flow file queued in incoming connection
- bool flowFilesQueued();
// Whether flow file queue full in any of the outgoin connection
- bool flowFilesOutGoingFull();
+ bool flowFilesOutGoingFull() const;
- // Add connection
bool addConnection(std::shared_ptr<Connectable> connection);
- // Remove connection
void removeConnection(std::shared_ptr<Connectable> connection);
- // Get the Next RoundRobin incoming connection
- std::shared_ptr<Connection> getNextIncomingConnection();
- // On Trigger
virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory);
-
void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory);
bool canEdit() override {
@@ -295,7 +278,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
private:
// Mutex for protection
- std::mutex mutex_;
+ mutable std::mutex mutex_;
// Yield Expiration
std::atomic<uint64_t> yield_expiration_;
diff --git a/libminifi/include/utils/FlowFileQueue.h b/libminifi/include/utils/FlowFileQueue.h
new file mode 100644
index 0000000..4ec01b2
--- /dev/null
+++ b/libminifi/include/utils/FlowFileQueue.h
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "core/FlowFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+class FlowFileQueue {
+ public:
+ using value_type = std::shared_ptr<core::FlowFile>;
+
+ value_type pop();
+ value_type forcePop();
+ void push(const value_type& element);
+ void push(value_type&& element);
+ bool canBePopped() const;
+ bool empty() const;
+ size_t size() const;
+
+ private:
+ bool existsFlowFileWithExpiredPenalty() const;
+
+ struct FlowFilePenaltyExpirationComparator {
+ bool operator()(const value_type& left, const value_type& right);
+ };
+
+ std::queue<value_type> fifo_queue_;
+ std::priority_queue<value_type, std::vector<value_type>, FlowFilePenaltyExpirationComparator> priority_queue_;
+};
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index ccae760..d237896 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -181,9 +181,8 @@ void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
std::lock_guard<std::mutex> lock(mutex_);
- while (!queue_.empty()) {
- std::shared_ptr<core::FlowFile> item = queue_.front();
- queue_.pop();
+ while (queue_.canBePopped()) {
+ std::shared_ptr<core::FlowFile> item = queue_.pop();
queued_data_size_ -= item->getSize();
if (expired_duration_ > 0) {
@@ -193,26 +192,12 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
expiredFlowRecords.insert(item);
logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
} else {
- // Flow record not expired
- if (item->isPenalized()) {
- // Flow record was penalized
- queue_.push(item);
- queued_data_size_ += item->getSize();
- break;
- }
std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setConnection(connectable);
logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
return item;
}
} else {
- // Flow record not expired
- if (item->isPenalized()) {
- // Flow record was penalized
- queue_.push(item);
- queued_data_size_ += item->getSize();
- break;
- }
std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setConnection(connectable);
logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
@@ -227,8 +212,7 @@ void Connection::drain(bool delete_permanently) {
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty()) {
- std::shared_ptr<core::FlowFile> item = queue_.front();
- queue_.pop();
+ std::shared_ptr<core::FlowFile> item = queue_.forcePop();
logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
if (delete_permanently) {
if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 107aa10..a6ef4b0 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -33,7 +33,7 @@ namespace minifi {
bool SchedulingAgent::hasWorkToDo(const std::shared_ptr<core::Processor>& processor) {
// Whether it has work to do
- if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->flowFilesQueued())
+ if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->isWorkAvailable())
return true;
else
return false;
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 6294a2f..56eb82a 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -208,22 +208,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
}
}
-bool Processor::flowFilesQueued() {
- std::lock_guard<std::mutex> lock(mutex_);
-
- if (_incomingConnections.size() == 0)
- return false;
-
- for (auto &&conn : _incomingConnections) {
- std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
- if (connection->getQueueSize() > 0)
- return true;
- }
-
- return false;
-}
-
-bool Processor::flowFilesOutGoingFull() {
+bool Processor::flowFilesOutGoingFull() const {
std::lock_guard<std::mutex> lock(mutex_);
for (const auto& connection_pair : out_going_connections_) {
@@ -285,7 +270,7 @@ bool Processor::isWorkAvailable() {
try {
for (const auto &conn : _incomingConnections) {
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
- if (connection->getQueueSize() > 0) {
+ if (connection->isWorkAvailable()) {
hasWork = true;
break;
}
diff --git a/libminifi/src/utils/FlowFileQueue.cpp b/libminifi/src/utils/FlowFileQueue.cpp
new file mode 100644
index 0000000..8fe9e78
--- /dev/null
+++ b/libminifi/src/utils/FlowFileQueue.cpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/FlowFileQueue.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_type& left, const value_type& right) {
+ // this is operator< implemented using > so that top() is the element with the smallest key (earliest expiration)
+ // rather than the element with the largest key, which is the default for std::priority_queue
+ return left->getPenaltyExpiration() > right->getPenaltyExpiration();
+}
+
+FlowFileQueue::value_type FlowFileQueue::pop() {
+ if (existsFlowFileWithExpiredPenalty()) {
+ value_type next_flow_file = priority_queue_.top();
+ priority_queue_.pop();
+ return next_flow_file;
+ }
+
+ if (!fifo_queue_.empty()) {
+ value_type next_flow_file = fifo_queue_.front();
+ fifo_queue_.pop();
+ return next_flow_file;
+ }
+
+ throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
+}
+
+/**
+ * Pops any flow file off the queue, whether it has an unexpired penalty or not.
+ */
+FlowFileQueue::value_type FlowFileQueue::forcePop() {
+ if (!fifo_queue_.empty()) {
+ value_type next_flow_file = fifo_queue_.front();
+ fifo_queue_.pop();
+ return next_flow_file;
+ }
+
+ if (!priority_queue_.empty()) {
+ value_type next_flow_file = priority_queue_.top();
+ priority_queue_.pop();
+ return next_flow_file;
+ }
+
+ throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+}
+
+void FlowFileQueue::push(const value_type& element) {
+ if (element->isPenalized()) {
+ priority_queue_.push(element);
+ } else {
+ fifo_queue_.push(element);
+ }
+}
+
+void FlowFileQueue::push(value_type&& element) {
+ if (element->isPenalized()) {
+ priority_queue_.push(std::move(element));
+ } else {
+ fifo_queue_.push(std::move(element));
+ }
+}
+
+bool FlowFileQueue::canBePopped() const {
+ return !fifo_queue_.empty() || existsFlowFileWithExpiredPenalty();
+}
+
+bool FlowFileQueue::empty() const {
+ return fifo_queue_.empty() && priority_queue_.empty();
+}
+
+size_t FlowFileQueue::size() const {
+ return fifo_queue_.size() + priority_queue_.size();
+}
+
+bool FlowFileQueue::existsFlowFileWithExpiredPenalty() const {
+ return !priority_queue_.empty() && !priority_queue_.top()->isPenalized();
+}
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
new file mode 100644
index 0000000..53e7ea8
--- /dev/null
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Connection.h"
+
+#include "../TestBase.h"
+
+TEST_CASE("Connection::poll() works correctly", "[poll]") {
+ const auto flow_repo = std::make_shared<TestRepository>();
+ const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
+
+ const auto id_generator = utils::IdGenerator::getIdGenerator();
+ utils::Identifier connection_id = id_generator->generate();
+ utils::Identifier src_id = id_generator->generate();
+ utils::Identifier dest_id = id_generator->generate();
+
+ const auto connection = std::make_shared<minifi::Connection>(flow_repo, content_repo, "test_connection", connection_id, src_id, dest_id);
+ std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+
+ SECTION("when called on an empty Connection, poll() returns nullptr") {
+ SECTION("without expiration duration") {}
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+ REQUIRE(nullptr == connection->poll(expired_flow_files));
+ }
+
+ SECTION("when called on a connection with a single flow file, poll() returns the flow file") {
+ SECTION("without expiration duration") {}
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ connection->put(flow_file);
+ REQUIRE(flow_file == connection->poll(expired_flow_files));
+ REQUIRE(nullptr == connection->poll(expired_flow_files));
+ }
+
+ SECTION("when called on a connection with a single penalized flow file, poll() returns nullptr") {
+ SECTION("without expiration duration") {}
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
+ const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
+ flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+ connection->put(flow_file);
+ REQUIRE(nullptr == connection->poll(expired_flow_files));
+ }
+
+ SECTION("when called on a connection with a single expired flow file, poll() returns nullptr and returns the expired flow file in the out parameter") {
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ connection->setFlowExpirationDuration(1); // 1 millisecond
+ connection->put(flow_file);
+ std::this_thread::sleep_for(std::chrono::milliseconds{2});
+ REQUIRE(nullptr == connection->poll(expired_flow_files));
+ REQUIRE(std::set<std::shared_ptr<core::FlowFile>>{flow_file} == expired_flow_files);
+ }
+
+ SECTION("when there is a non-penalized flow file followed by a penalized flow file, poll() returns the non-penalized flow file") {
+ SECTION("without expiration duration") {}
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+
+ const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+ const auto future_time = std::chrono::system_clock::now() + std::chrono::seconds{10};
+ const auto future_time_ms_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(future_time.time_since_epoch()).count();
+ penalized_flow_file->setPenaltyExpiration(future_time_ms_since_epoch);
+ connection->put(penalized_flow_file);
+
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ connection->put(flow_file);
+
+ REQUIRE(flow_file == connection->poll(expired_flow_files));
+ REQUIRE(nullptr == connection->poll(expired_flow_files));
+ }
+}
diff --git a/libminifi/test/unit/FlowFileQueueTests.cpp b/libminifi/test/unit/FlowFileQueueTests.cpp
new file mode 100644
index 0000000..d64169d
--- /dev/null
+++ b/libminifi/test/unit/FlowFileQueueTests.cpp
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FlowFileQueue.h"
+
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+
+TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") {
+ utils::FlowFileQueue queue;
+
+ REQUIRE(queue.empty());
+ REQUIRE(queue.size() == 0);
+ REQUIRE_FALSE(queue.canBePopped());
+ REQUIRE_THROWS(queue.pop());
+ REQUIRE_THROWS(queue.forcePop());
+}
+
+TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can pop it", "[FlowFileQueue][pop]") {
+ utils::FlowFileQueue queue;
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ queue.push(flow_file);
+
+ REQUIRE_FALSE(queue.empty());
+ REQUIRE(queue.size() == 1);
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file);
+}
+
+TEST_CASE("A flow file can be moved into the FlowFileQueue", "[FlowFileQueue][pop]") {
+ utils::FlowFileQueue queue;
+
+ auto penalized_flow_file = std::make_shared<core::FlowFile>();
+ penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 100);
+ queue.push(std::move(penalized_flow_file));
+
+ queue.push(std::make_shared<core::FlowFile>());
+
+ REQUIRE_FALSE(queue.empty());
+ REQUIRE(queue.size() == 2);
+}
+
+TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them in FIFO order", "[FlowFileQueue][pop]") {
+ utils::FlowFileQueue queue;
+ const auto flow_file_1 = std::make_shared<core::FlowFile>();
+ queue.push(flow_file_1);
+ const auto flow_file_2 = std::make_shared<core::FlowFile>();
+ queue.push(flow_file_2);
+ const auto flow_file_3 = std::make_shared<core::FlowFile>();
+ queue.push(flow_file_3);
+
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_1);
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_2);
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_3);
+ REQUIRE_FALSE(queue.canBePopped());
+}
+
+namespace {
+
+class PenaltyHasExpired {
+ public:
+ explicit PenaltyHasExpired(const std::shared_ptr<core::FlowFile>& flow_file) : flow_file_(flow_file) {}
+ bool operator()() { return !flow_file_->isPenalized(); }
+
+ private:
+ std::shared_ptr<core::FlowFile> flow_file_;
+};
+
+} // namespace
+
+TEST_CASE("Penalized flow files are popped from the FlowFileQueue in the order their penalties expire", "[FlowFileQueue][pop]") {
+ utils::FlowFileQueue queue;
+ const auto now = utils::timeutils::getTimeMillis();
+ const auto flow_file_1 = std::make_shared<core::FlowFile>();
+ flow_file_1->setPenaltyExpiration(now + 70);
+ queue.push(flow_file_1);
+ const auto flow_file_2 = std::make_shared<core::FlowFile>();
+ flow_file_2->setPenaltyExpiration(now + 50);
+ queue.push(flow_file_2);
+ const auto flow_file_3 = std::make_shared<core::FlowFile>();
+ flow_file_3->setPenaltyExpiration(now + 80);
+ queue.push(flow_file_3);
+ const auto flow_file_4 = std::make_shared<core::FlowFile>();
+ flow_file_4->setPenaltyExpiration(now + 60);
+ queue.push(flow_file_4);
+
+ REQUIRE_FALSE(queue.canBePopped());
+
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_2}, std::chrono::milliseconds{10}));
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_2);
+
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_4}, std::chrono::milliseconds{10}));
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_4);
+
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_1}, std::chrono::milliseconds{10}));
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_1);
+
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_3}, std::chrono::milliseconds{10}));
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file_3);
+
+ REQUIRE_FALSE(queue.canBePopped());
+}
+
+TEST_CASE("If a penalized then a non-penalized flow file is added to the FlowFileQueue, pop() returns the correct one", "[FlowFileQueue][pop]") {
+ utils::FlowFileQueue queue;
+ const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+ penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+ queue.push(penalized_flow_file);
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ queue.push(flow_file);
+
+ SECTION("Try popping right away") {
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file);
+ REQUIRE_FALSE(queue.canBePopped());
+ }
+
+ SECTION("Wait until the penalty expires, then pop") {
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{penalized_flow_file}, std::chrono::milliseconds{10}));
+
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == penalized_flow_file);
+ REQUIRE(queue.canBePopped());
+ REQUIRE(queue.pop() == flow_file);
+ REQUIRE_FALSE(queue.canBePopped());
+ }
+}
+
+TEST_CASE("Force pop on FlowFileQueue returns the flow files, whether penalized or not", "[FlowFileQueue][forcePop]") {
+ utils::FlowFileQueue queue;
+ const auto penalized_flow_file = std::make_shared<core::FlowFile>();
+ penalized_flow_file->setPenaltyExpiration(utils::timeutils::getTimeMillis() + 10);
+ queue.push(penalized_flow_file);
+ const auto flow_file = std::make_shared<core::FlowFile>();
+ queue.push(flow_file);
+
+ REQUIRE_FALSE(queue.empty());
+ REQUIRE(queue.forcePop() == flow_file);
+ REQUIRE(queue.forcePop() == penalized_flow_file);
+ REQUIRE(queue.empty());
+}
diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h
index ac64c20..c55851d 100644
--- a/libminifi/test/unit/MockClasses.h
+++ b/libminifi/test/unit/MockClasses.h
@@ -115,7 +115,7 @@ class MockProcessor : public core::Processor {
}
}
- bool isYield() {
+ bool isYield() override {
return false;
}