You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/03/11 08:13:32 UTC

[GitHub] [nifi-minifi-cpp] fgerlits opened a new pull request #1027: MINIFICPP-1487 Use a single queue for flow files

fgerlits opened a new pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027


   https://issues.apache.org/jira/browse/MINIFICPP-1487
   
   This PR has some overlap with #1025 (the first two commits), so it may be easier to review the commits separately, skipping those.  We may merge either of these PRs, both, or neither.
   
   Instead of a separate `deque` for non-penalized flow files and a `priority_queue` for penalized flow files, this PR uses a single `priority_queue` for both, setting a "due date" to be the insertion time for non-penalized flow files and the penalty expiration time for penalized flow files.
   
   This makes the code in FlowFileQueue simpler, easier to reason about, and the unit tests more robust.  On the other hand, `push` and `pop` are now O(log n) for non-penalized flow files, too, instead of O(1).  I think this is an acceptable cost, as O(log n) is not significantly slower than O(1), but I'm interested in what others think.
   
   ---
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni edited a comment on pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni edited a comment on pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#issuecomment-796714939


   ![image](https://user-images.githubusercontent.com/64783590/110789866-a28c8c00-8270-11eb-9479-0d69068f251f.png)
   
   !mind the baseline labeled "empty", that is a constant cost of the benchmarking system!
   
   I have measured if there is observable penalty going to O(log n), these tests operate (push/pop) on 10 `std::shared_ptr` instances, popping 10 elements from a 1000000 long priority_queue takes less than a microsecond on my machine, of course compared to the deque it is a slowdown by a factor of 10, but based on this, I would not worry about it being a bottleneck of some sort


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592980496



##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -615,65 +615,131 @@ TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
 
-TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+namespace {
+
+class ProcessorWithIncomingConnectionTest {
+ public:
+  ProcessorWithIncomingConnectionTest();
+  ~ProcessorWithIncomingConnectionTest();
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<minifi::Connection> incoming_connection_;
+  std::shared_ptr<core::ProcessSession> session_;
+};
+
+ProcessorWithIncomingConnectionTest::ProcessorWithIncomingConnectionTest() {
   LogTestController::getInstance().setDebug<core::Processor>();
 
   const auto repo = std::make_shared<TestRepository>();
   const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
-  const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
-  const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
-  incoming_connection->addRelationship(core::Relationship{"success", ""});
-  incoming_connection->setDestinationUUID(processor->getUUID());
-  processor->addConnection(incoming_connection);
-  processor->initialize();
+  processor_ = std::make_shared<processors::LogAttribute>("test_processor");
+  incoming_connection_ = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+  incoming_connection_->addRelationship(core::Relationship{"success", ""});
+  incoming_connection_->setDestinationUUID(processor_->getUUID());
+  processor_->addConnection(incoming_connection_);
+  processor_->initialize();
 
-  const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+  const auto processor_node = std::make_shared<core::ProcessorNode>(processor_);
   const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
   const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
-  const auto session = session_factory->createSession();
+  session_ = session_factory->createSession();
+}
 
+ProcessorWithIncomingConnectionTest::~ProcessorWithIncomingConnectionTest() {
+  LogTestController::getInstance().reset();
+}
+
+}  // namespace
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
   SECTION("Initially, the queue is empty, so there is no work available") {
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("When a non-penalized flow file is queued, there is work available") {
-    const auto flow_file = session->create();
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    incoming_connection_->put(flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
-    const auto normal_flow_file = session->create();
-    incoming_connection->put(normal_flow_file);
+    const auto normal_flow_file = session_->create();
+    incoming_connection_->put(normal_flow_file);
 
-    const auto penalized_flow_file = session->create();
-    session->penalize(penalized_flow_file);
-    incoming_connection->put(penalized_flow_file);
+    const auto penalized_flow_file = session_->create();
+    session_->penalize(penalized_flow_file);
+    incoming_connection_->put(penalized_flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
-    processor->setPenalizationPeriodMsec(10);
+    processor_->setPenalizationPeriod(std::chrono::milliseconds{10});
 
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
     const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
     REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 }
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A failed and re-penalized flow file does not block the incoming queue of the Processor", "[penalize]") {
+  processor_->setPenalizationPeriod(std::chrono::milliseconds{100});
+  const auto penalized_flow_file = session_->create();
+  session_->penalize(penalized_flow_file);  // first penalty duration is 100 ms
+  incoming_connection_->put(penalized_flow_file);
+  const auto penalty_has_expired = [penalized_flow_file] { return !penalized_flow_file->isPenalized(); };
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+
+  const auto flow_file_1 = session_->create();
+  incoming_connection_->put(flow_file_1);
+  const auto flow_file_2 = session_->create();
+  incoming_connection_->put(flow_file_2);
+  const auto flow_file_3 = session_->create();
+  incoming_connection_->put(flow_file_3);
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+  const auto next_flow_file_1 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_1 == penalized_flow_file);
+
+  session_->penalize(penalized_flow_file);  // second penalty duration is 200 ms

Review comment:
       does this comment apply?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592984942



##########
File path: libminifi/src/utils/FlowFileQueue.cpp
##########
@@ -30,70 +30,41 @@ bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_
 }
 
 FlowFileQueue::value_type FlowFileQueue::pop() {
-  if (existsFlowFileWithExpiredPenalty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
+  if (empty()) {
+    throw std::logic_error{"pop() called on an empty FlowFileQueue"};
   }
 
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
-}
-
-/**
- * Pops any flow file off the queue, whether it has an unexpired penalty or not.
- */
-FlowFileQueue::value_type FlowFileQueue::forcePop() {
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  if (!priority_queue_.empty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+  value_type next_flow_file = queue_.top();

Review comment:
       I don't think we can: `top()` returns a const reference (and is a const member function).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592980213



##########
File path: libminifi/src/utils/FlowFileQueue.cpp
##########
@@ -30,70 +30,41 @@ bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_
 }
 
 FlowFileQueue::value_type FlowFileQueue::pop() {
-  if (existsFlowFileWithExpiredPenalty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
+  if (empty()) {
+    throw std::logic_error{"pop() called on an empty FlowFileQueue"};
   }
 
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
-}
-
-/**
- * Pops any flow file off the queue, whether it has an unexpired penalty or not.
- */
-FlowFileQueue::value_type FlowFileQueue::forcePop() {
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  if (!priority_queue_.empty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+  value_type next_flow_file = queue_.top();

Review comment:
       should we move from `queue_.top()`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592989626



##########
File path: libminifi/src/utils/FlowFileQueue.cpp
##########
@@ -30,70 +30,41 @@ bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_
 }
 
 FlowFileQueue::value_type FlowFileQueue::pop() {
-  if (existsFlowFileWithExpiredPenalty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
+  if (empty()) {
+    throw std::logic_error{"pop() called on an empty FlowFileQueue"};
   }
 
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"pop() called on FlowFileQueue when canBePopped() is false"};
-}
-
-/**
- * Pops any flow file off the queue, whether it has an unexpired penalty or not.
- */
-FlowFileQueue::value_type FlowFileQueue::forcePop() {
-  if (!fifo_queue_.empty()) {
-    value_type next_flow_file = fifo_queue_.front();
-    fifo_queue_.pop();
-    return next_flow_file;
-  }
-
-  if (!priority_queue_.empty()) {
-    value_type next_flow_file = priority_queue_.top();
-    priority_queue_.pop();
-    return next_flow_file;
-  }
-
-  throw std::logic_error{"forcePop() called on an empty FlowFileQueue"};
+  value_type next_flow_file = queue_.top();

Review comment:
       that is a bummer, there really is no non-const overload :( 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#issuecomment-796727423


   
   ![image](https://user-images.githubusercontent.com/64783590/110792674-f9e02b80-8273-11eb-9bd5-6b7ac56b49ea.png)
   
   
   !mind the baseline labeled "empty", that is a constant cost of the benchmarking system!
   
   I have measured if there is observable penalty going to O(log n), these tests operate (push/pop) on 10 `std::shared_ptr<int>` instances (with custom comparator), popping 10 elements from a 1000000 long priority_queue takes less than a `6.5 us` on my machine, of course compared to the deque it is a slowdown by a factor of ~100, but based on this, I would not worry about it being a bottleneck of some sort


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592985570



##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -615,65 +615,131 @@ TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
 
-TEST_CASE("A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
+namespace {
+
+class ProcessorWithIncomingConnectionTest {
+ public:
+  ProcessorWithIncomingConnectionTest();
+  ~ProcessorWithIncomingConnectionTest();
+
+ protected:
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<minifi::Connection> incoming_connection_;
+  std::shared_ptr<core::ProcessSession> session_;
+};
+
+ProcessorWithIncomingConnectionTest::ProcessorWithIncomingConnectionTest() {
   LogTestController::getInstance().setDebug<core::Processor>();
 
   const auto repo = std::make_shared<TestRepository>();
   const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(std::make_shared<minifi::Configure>());
 
-  const std::shared_ptr<core::Processor> processor = std::make_shared<processors::LogAttribute>("test_processor");
-  const auto incoming_connection = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
-  incoming_connection->addRelationship(core::Relationship{"success", ""});
-  incoming_connection->setDestinationUUID(processor->getUUID());
-  processor->addConnection(incoming_connection);
-  processor->initialize();
+  processor_ = std::make_shared<processors::LogAttribute>("test_processor");
+  incoming_connection_ = std::make_shared<minifi::Connection>(repo, content_repo, "incoming_connection");
+  incoming_connection_->addRelationship(core::Relationship{"success", ""});
+  incoming_connection_->setDestinationUUID(processor_->getUUID());
+  processor_->addConnection(incoming_connection_);
+  processor_->initialize();
 
-  const auto processor_node = std::make_shared<core::ProcessorNode>(processor);
+  const auto processor_node = std::make_shared<core::ProcessorNode>(processor_);
   const auto context = std::make_shared<core::ProcessContext>(processor_node, nullptr, repo, repo, content_repo);
   const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
-  const auto session = session_factory->createSession();
+  session_ = session_factory->createSession();
+}
 
+ProcessorWithIncomingConnectionTest::~ProcessorWithIncomingConnectionTest() {
+  LogTestController::getInstance().reset();
+}
+
+}  // namespace
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A Processor detects correctly if it has incoming flow files it can process", "[isWorkAvailable]") {
   SECTION("Initially, the queue is empty, so there is no work available") {
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("When a non-penalized flow file is queued, there is work available") {
-    const auto flow_file = session->create();
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    incoming_connection_->put(flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is no work available (until the penalty expires)") {
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
   }
 
   SECTION("If there is both a penalized and a non-penalized flow file queued, there is work available") {
-    const auto normal_flow_file = session->create();
-    incoming_connection->put(normal_flow_file);
+    const auto normal_flow_file = session_->create();
+    incoming_connection_->put(normal_flow_file);
 
-    const auto penalized_flow_file = session->create();
-    session->penalize(penalized_flow_file);
-    incoming_connection->put(penalized_flow_file);
+    const auto penalized_flow_file = session_->create();
+    session_->penalize(penalized_flow_file);
+    incoming_connection_->put(penalized_flow_file);
 
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 
   SECTION("When a penalized flow file is queued, there is work available after the penalty expires") {
-    processor->setPenalizationPeriodMsec(10);
+    processor_->setPenalizationPeriod(std::chrono::milliseconds{10});
 
-    const auto flow_file = session->create();
-    session->penalize(flow_file);
-    incoming_connection->put(flow_file);
+    const auto flow_file = session_->create();
+    session_->penalize(flow_file);
+    incoming_connection_->put(flow_file);
 
-    REQUIRE_FALSE(processor->isWorkAvailable());
+    REQUIRE_FALSE(processor_->isWorkAvailable());
     const auto penalty_has_expired = [flow_file] { return !flow_file->isPenalized(); };
     REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
-    REQUIRE(processor->isWorkAvailable());
+    REQUIRE(processor_->isWorkAvailable());
   }
 }
+
+TEST_CASE_METHOD(ProcessorWithIncomingConnectionTest, "A failed and re-penalized flow file does not block the incoming queue of the Processor", "[penalize]") {
+  processor_->setPenalizationPeriod(std::chrono::milliseconds{100});
+  const auto penalized_flow_file = session_->create();
+  session_->penalize(penalized_flow_file);  // first penalty duration is 100 ms
+  incoming_connection_->put(penalized_flow_file);
+  const auto penalty_has_expired = [penalized_flow_file] { return !penalized_flow_file->isPenalized(); };
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, penalty_has_expired, std::chrono::milliseconds{10}));
+
+  const auto flow_file_1 = session_->create();
+  incoming_connection_->put(flow_file_1);
+  const auto flow_file_2 = session_->create();
+  incoming_connection_->put(flow_file_2);
+  const auto flow_file_3 = session_->create();
+  incoming_connection_->put(flow_file_3);
+
+  REQUIRE(incoming_connection_->isWorkAvailable());
+  std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+  const auto next_flow_file_1 = incoming_connection_->poll(expired_flow_files);
+  REQUIRE(next_flow_file_1 == penalized_flow_file);
+
+  session_->penalize(penalized_flow_file);  // second penalty duration is 200 ms

Review comment:
       Nope.  Removed in https://github.com/apache/nifi-minifi-cpp/pull/1027/commits/a42c324123487d8a05be45c236ef1b42fd24457e.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#issuecomment-796714939


   ![image](https://user-images.githubusercontent.com/64783590/110789866-a28c8c00-8270-11eb-9479-0d69068f251f.png)
   
   !mind the baseline labeled "empty", that is a constant cost of the benchmarking system!
   
   I have measured if there is observable penalty going to O(log n), these tests operate (push/pop) on 10 `std::shared_ptr` instances, popping 10 elements from a 1000000 long priority_queue takes less than a microsecond on my machine, of course compared to the deque it is a slowdown on the order of 10, but based on this, I would not worry about it being a bottleneck of some sort


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni closed pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni closed pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#discussion_r592416644



##########
File path: libminifi/include/core/state/nodes/SchedulingNodes.h
##########
@@ -85,7 +85,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode penalizationPeriod;
     penalizationPeriod.name = "penalizationPeriodMillis";
-    penalizationPeriod.value = DEFAULT_PENALIZATION_PERIOD_SECONDS*1000;
+    penalizationPeriod.value = std::chrono::duration_cast<std::chrono::milliseconds>(core::DEFAULT_PENALIZATION_PERIOD).count();

Review comment:
       https://github.com/apache/nifi-minifi-cpp/pull/1025#discussion_r591436977
   ```suggestion
       penalizationPeriod.value = std::chrono::milliseconds{core::DEFAULT_PENALIZATION_PERIOD}.count();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni removed a comment on pull request #1027: MINIFICPP-1487 Use a single queue for flow files

Posted by GitBox <gi...@apache.org>.
adamdebreceni removed a comment on pull request #1027:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1027#issuecomment-796714939


   ![image](https://user-images.githubusercontent.com/64783590/110789866-a28c8c00-8270-11eb-9479-0d69068f251f.png)
   
   !mind the baseline labeled "empty", that is a constant cost of the benchmarking system!
   
   I have measured if there is observable penalty going to O(log n), these tests operate (push/pop) on 10 `std::shared_ptr` instances, popping 10 elements from a 1000000 long priority_queue takes less than a microsecond on my machine, of course compared to the deque it is a slowdown by a factor of 10, but based on this, I would not worry about it being a bottleneck of some sort


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org