You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "martinzink (via GitHub)" <gi...@apache.org> on 2023/05/31 05:57:02 UTC

[GitHub] [nifi-minifi-cpp] martinzink opened a new pull request, #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

martinzink opened a new pull request, #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581

   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [x] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [x] If applicable, have you updated the LICENSE file?
   - [x] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211800505


##########
libminifi/include/utils/ThreadPool.h:
##########
@@ -94,7 +94,7 @@ class Worker {
       promise->set_value(result);
       return false;
     }
-    next_exec_time_ = std::max(next_exec_time_ + run_determinant_->wait_time(), std::chrono::steady_clock::now());
+    next_exec_time_ = std::max(next_exec_time_, std::chrono::steady_clock::now() + run_determinant_->wait_time());

Review Comment:
   You are right, but I think this should be TimerDrivenSchedulelingAgent's responibility. I've changed it in https://github.com/apache/nifi-minifi-cpp/pull/1581/commits/64abd78c460099378682ac8bef8a734ec44e70a3#diff-dad78983913a0ec9fc95090e0322f6535e7d690b3ad5a14d47ea27c89006ccc3R36-R37



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm closed pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1214442945


##########
libminifi/src/TimerDrivenSchedulingAgent.cpp:
##########
@@ -28,11 +28,13 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
+    auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
-    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
+    auto next_execution_time_point = trigger_start_time + processor->getSchedulingPeriod();
+    return utils::TaskRescheduleInfo::RetryIn(next_execution_time_point - std::chrono::steady_clock::now());

Review Comment:
   If this is negative, we shouldn't schedule processors in the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211136629


##########
libminifi/test/unit/ThreadPoolTests.cpp:
##########


Review Comment:
   What is the function called `function` used for at line 28? Can we remove it if it's unused?



##########
libminifi/src/core/Processor.cpp:
##########
@@ -372,26 +372,26 @@ void Processor::setMaxConcurrentTasks(const uint8_t tasks) {
 }
 
 void Processor::yield() {
-  yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load();
+  yield_expiration_ = std::chrono::steady_clock::now() + yield_period_msec_.load();
 }
 
 void Processor::yield(std::chrono::milliseconds delta_time) {
-  yield_expiration_ = std::chrono::system_clock::now() + delta_time;
+  yield_expiration_ = std::chrono::steady_clock::now() + delta_time;
 }
 
 bool Processor::isYield() {
-  return yield_expiration_.load() >= std::chrono::system_clock::now();
+  return getYieldTime() > 0ms;
 }
 
 void Processor::clearYield() {
-  yield_expiration_ = std::chrono::system_clock::time_point();
+  yield_expiration_ = std::chrono::steady_clock::time_point();
 }
 
 std::chrono::milliseconds Processor::getYieldTime() const {
   auto yield_expiration = yield_expiration_.load();
-  auto current_time = std::chrono::system_clock::now();
+  auto current_time = std::chrono::steady_clock::now();

Review Comment:
   Now that timestamps are changed from system_clock to steady_clock, does this affect serialized timestamps? I don't think `steady_clock` is guaranteed to be steadily increasing in sync with real time between executions or reboots, so reading persisted timestamps anywhere may result in incorrect behavior.



##########
libminifi/src/utils/ThreadPool.cpp:
##########
@@ -151,10 +153,10 @@ void ThreadPool<T>::manageWorkers() {
 
   if (nullptr != thread_manager_) {
     while (running_) {
-      auto waitperiod = std::chrono::milliseconds(500);
+      auto wait_period = 500ms;
       {
-        std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock);
-        if (!lock.owns_lock()) {
+        std::unique_lock<std::recursive_mutex> manager_lock(manager_mutex_, std::try_to_lock);
+        if (!manager_lock.owns_lock()) {

Review Comment:
   It looks like this mutex lock should be retried after waiting. Currently it just proceeds to access the shared data without synchronization, after waiting a bit, if the first lock attempt fails. 



##########
libminifi/include/utils/ThreadPool.h:
##########
@@ -94,7 +94,7 @@ class Worker {
       promise->set_value(result);
       return false;
     }
-    next_exec_time_ = std::max(next_exec_time_ + run_determinant_->wait_time(), std::chrono::steady_clock::now());
+    next_exec_time_ = std::max(next_exec_time_, std::chrono::steady_clock::now() + run_determinant_->wait_time());

Review Comment:
   This seems to change behavior, especially for long running processors: If a processor is scheduled to run once every 5 seconds, and it takes 4 seconds to run, then I'd expect the next execution to happen 1 second after finishing execution, not 5 seconds later. The latter would mean effectively running every 9 seconds instead of the configured 5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1213029823


##########
libminifi/test/unit/ThreadPoolTests.cpp:
##########
@@ -84,7 +81,33 @@ TEST_CASE("ThreadPoolTest2", "[TPT2]") {
   utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
   pool.start();
   std::future<int> fut;
-  pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
+  pool.execute(std::move(functor), fut);
   fut.wait();
   REQUIRE(20 == fut.get());
 }
+
+TEST_CASE("Worker wait time should be relative to the last run") {
+  std::mutex cv_m;
+  std::condition_variable cv;

Review Comment:
   I've simplified this test in https://github.com/apache/nifi-minifi-cpp/pull/1581/commits/187f07eb04efef01712e81cd9f3fc1dfa93bc93d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211802167


##########
libminifi/src/core/Processor.cpp:
##########
@@ -372,26 +372,26 @@ void Processor::setMaxConcurrentTasks(const uint8_t tasks) {
 }
 
 void Processor::yield() {
-  yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load();
+  yield_expiration_ = std::chrono::steady_clock::now() + yield_period_msec_.load();
 }
 
 void Processor::yield(std::chrono::milliseconds delta_time) {
-  yield_expiration_ = std::chrono::system_clock::now() + delta_time;
+  yield_expiration_ = std::chrono::steady_clock::now() + delta_time;
 }
 
 bool Processor::isYield() {
-  return yield_expiration_.load() >= std::chrono::system_clock::now();
+  return getYieldTime() > 0ms;
 }
 
 void Processor::clearYield() {
-  yield_expiration_ = std::chrono::system_clock::time_point();
+  yield_expiration_ = std::chrono::steady_clock::time_point();
 }
 
 std::chrono::milliseconds Processor::getYieldTime() const {
   auto yield_expiration = yield_expiration_.load();
-  auto current_time = std::chrono::system_clock::now();
+  auto current_time = std::chrono::steady_clock::now();

Review Comment:
   afaik we dont serialize these anywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1215630971


##########
libminifi/src/TimerDrivenSchedulingAgent.cpp:
##########
@@ -28,11 +28,13 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
+    auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
-    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
+    auto next_execution_time_point = trigger_start_time + processor->getSchedulingPeriod();
+    return utils::TaskRescheduleInfo::RetryIn(next_execution_time_point - std::chrono::steady_clock::now());

Review Comment:
   I think this is surprising behavior, and it would be better to have a narrow contract precondition in the constructor, and calculate it where it's called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211801649


##########
libminifi/test/unit/ThreadPoolTests.cpp:
##########


Review Comment:
   Replaced these with lambdas in https://github.com/apache/nifi-minifi-cpp/pull/1581/commits/64abd78c460099378682ac8bef8a734ec44e70a3#diff-c5171c6414a98ddf1b6b7a2b62956d80c96251c29bbec9278c478cf8fc9cbb9bR57



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211807969


##########
libminifi/src/utils/ThreadPool.cpp:
##########
@@ -151,10 +153,10 @@ void ThreadPool<T>::manageWorkers() {
 
   if (nullptr != thread_manager_) {
     while (running_) {
-      auto waitperiod = std::chrono::milliseconds(500);
+      auto wait_period = 500ms;
       {
-        std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock);
-        if (!lock.owns_lock()) {
+        std::unique_lock<std::recursive_mutex> manager_lock(manager_mutex_, std::try_to_lock);
+        if (!manager_lock.owns_lock()) {

Review Comment:
   I think you are right, since we couldnt find anything that would indicate this was conscious choice 3 years ago I think its safe to modify so it retries to lock it after this 10ms. 
   https://github.com/apache/nifi-minifi-cpp/pull/1581/commits/4f5a1c9f14b7e49eeed18d2dfcb0582ef35e8186#diff-7bc3087dc395ad862c065930e775dfbaf79006c55df75e6a4c055b03cd4586b6R159-R160



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1211836375


##########
libminifi/test/unit/ThreadPoolTests.cpp:
##########
@@ -84,7 +81,33 @@ TEST_CASE("ThreadPoolTest2", "[TPT2]") {
   utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
   pool.start();
   std::future<int> fut;
-  pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
+  pool.execute(std::move(functor), fut);
   fut.wait();
   REQUIRE(20 == fut.get());
 }
+
+TEST_CASE("Worker wait time should be relative to the last run") {
+  std::mutex cv_m;
+  std::condition_variable cv;

Review Comment:
   You could use `std::promise<void>` or `std::atomic_flag` (like #1576) for a simpler code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1214604135


##########
libminifi/src/TimerDrivenSchedulingAgent.cpp:
##########
@@ -28,11 +28,13 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
+    auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
-    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
+    auto next_execution_time_point = trigger_start_time + processor->getSchedulingPeriod();
+    return utils::TaskRescheduleInfo::RetryIn(next_execution_time_point - std::chrono::steady_clock::now());

Review Comment:
   We filter these out in the ctor in https://github.com/apache/nifi-minifi-cpp/pull/1581/files#diff-a598c25bd74649903655c7c852fe86ba84347d14c068acbe27c804bdcbee4605R55



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1581: MINIFICPP-2125 fix for waking up prematurely after processor yields

Posted by "martinzink (via GitHub)" <gi...@apache.org>.
martinzink commented on code in PR #1581:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1581#discussion_r1221354436


##########
libminifi/src/TimerDrivenSchedulingAgent.cpp:
##########
@@ -28,11 +28,13 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
+    auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
-    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
+    auto next_execution_time_point = trigger_start_time + processor->getSchedulingPeriod();
+    return utils::TaskRescheduleInfo::RetryIn(next_execution_time_point - std::chrono::steady_clock::now());

Review Comment:
   good point, I've changed the behaviour in https://github.com/apache/nifi-minifi-cpp/pull/1581/commits/adac20e8318a942ba0220a3a4b71b889aa6675ae, and created a helper function that does this calculation (its based on the time_point so I think thats expectable)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org