You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/06 14:32:40 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #833: WIP: MINIFICPP-1272 - Graceful shutdown on flow update

adamdebreceni opened a new pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833


   Will conflict with #822 as they both use the newly introduced test processors.
   
   ----
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453665360



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Just checked the code: there are functions to increment and decrement active tasks of a processor. The decrement happens after the session is commited, which a piece of good news for us: 
   ```
   bool Processor::isRunning() {
     return (state_ == RUNNING && active_tasks_ > 0);
   }
   ```
   
   This should properly be able to tell us whether the execution (including committing and destroying the session) already finished or not. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453665360



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Just checked the code: there are functions to increment and decrement active tasks of a processor. The decrement happens after the session is commited, which a piece of good news for us: 
   ```
   bool Processor::isRunning() {
     return (state_ == RUNNING && active_tasks_ > 0);
   }
   ```
   
   This should properly be able to tell us whether the execution (including committing and destroying the session) is already completed or not. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083106



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Sounds good to me, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r454137073



##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};
+        utils::optional<core::TimePeriodValue> shutdown_timeout;
+        std::string shutdown_timeout_str;
+        if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
+          shutdown_timeout = core::TimePeriodValue::fromString(shutdown_timeout_str);
+        }
+        return std::chrono::milliseconds{shutdown_timeout.value_or(default_timeout).getMilliseconds()};
+      };
+      std::size_t count;
+      while (shutdown_timer < shutdown_timeout() && (count = this->root_->getTotalFlowFileCount()) != 0) {
+        std::this_thread::sleep_for(shutdown_check_interval_);
+        shutdown_timer += shutdown_check_interval_;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453467576



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {

Review comment:
       that would make me think, that the `Connection` is `const` but only the `std::shared_ptr<...>` would be
   if it is clear for everybody else I can make the change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r452930520



##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};

Review comment:
       I think this shouldn't be defined in the middle of the code, very difficult to search for.
   Moreover it can delay shutdown, so I would prefer to leave it 0 by default. 

##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};
+        utils::optional<core::TimePeriodValue> shutdown_timeout;
+        std::string shutdown_timeout_str;
+        if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
+          shutdown_timeout = core::TimePeriodValue::fromString(shutdown_timeout_str);
+        }
+        return std::chrono::milliseconds{shutdown_timeout.value_or(default_timeout).getMilliseconds()};
+      };
+      std::size_t count;
+      while (shutdown_timer < shutdown_timeout() && (count = this->root_->getTotalFlowFileCount()) != 0) {
+        std::this_thread::sleep_for(shutdown_check_interval_);
+        shutdown_timer += shutdown_check_interval_;

Review comment:
       I would do it by saving the current time when this activity is started and check if the time elapsed exceeds the maximum allowed for this operation.

##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {

Review comment:
       Nitpicking: const auto&

##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Can't we simply wait here while the source processor's trigger count becomes one?

##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {
+    sum += conn->getQueueSize();
+  }
+
+  for (ProcessGroup* childGroup : child_process_groups_) {

Review comment:
       const ptr




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r454243881



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       `active_tasks_` is a strange animal, should be revisited, but waiting for the FlowFiles to be enqueued into the connections through the `ProcessGroup::getTotalFlowFileCount` method is a viable solution




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453665360



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       Just checked the code: there are functions to increment and decrement active tasks of a processor. The decrement happens after the session is commited, which a piece of good news for us: 
   ```
   bool Processor::isRunning() {
     return (state_ == RUNNING && active_tasks_ > 0);
   }
   ```
   
   This should properly be able to tell us whether the execution (including committing and destroying the session) is already finished or not. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453479898



##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
     REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
+
+  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+    static std::atomic<bool> first_onTrigger{true};
+    bool isFirst = true;
+    // sleep only on the first trigger
+    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+    }
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
       we are not waiting for the trigger but for the FlowFiles to be enqueued into the Connection, during shutdown we cannot check if a source processor stopped, thus we can only check the connections' contents, should we expose the fact that there are active sessions for a processor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083609



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {

Review comment:
       Fair point, not sure which option is better, so feel free to choose. You can also mark this thread resolved. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r453476744



##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
   if (running_) {
     // immediately indicate that we are not running
     logger_->log_info("Stop Flow Controller");
-    if (this->root_)
+    if (this->root_) {
+      // stop source processors first
+      this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+        return !proc->hasIncomingConnections();
+      });
+      std::chrono::milliseconds shutdown_timer{0};
+      // we enable C2 to progressively increase the timeout
+      // in case it sees that waiting for a little longer could
+      // allow the FlowFiles to be processed
+      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+        if (timeToWait != 0) {
+          return std::chrono::milliseconds{timeToWait};
+        }
+        static const core::TimePeriodValue default_timeout{"10 sec"};

Review comment:
       done

##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {
+    sum += conn->getQueueSize();
+  }
+
+  for (ProcessGroup* childGroup : child_process_groups_) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org