You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/06/13 15:33:26 UTC

[nifi] 02/13: NIFI-10049: When unscheduling reporting task, increment its concurrent task count until we've finished all shutdown logic and then decrement it, in much the same way that we do for processors

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 47127330669015ef7d861b0f178128f5ae0768f7
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue May 24 13:25:37 2022 -0400

    NIFI-10049: When unscheduling reporting task, increment its concurrent task count until we've finished all shutdown logic and then decrement it, in much the same way that we do for processors
    
    This closes #6076
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi/controller/scheduling/StandardProcessScheduler.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index a7dc11885d..546cd16fc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         }
 
         taskNode.verifyCanStop();
+
+        // Increment the Active Thread Count in order to ensure that we don't consider the Reporting Task completely stopped until we've run
+        // all lifecycle methods, such as @OnStopped
+        lifecycleState.incrementActiveThreadCount(null);
+
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
         taskNode.setScheduledState(ScheduledState.STOPPED);
@@ -302,9 +307,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
                     agent.unschedule(taskNode, lifecycleState);
 
-                    if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) {
+                    // If active thread count == 1, that indicates that all execution threads have completed. We use 1 here instead of 0 because
+                    // when the Reporting Task is unscheduled, we immediately increment the thread count to 1 as an indicator that we've not completely finished.
+                    if (lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
                         ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
                     }
+
+                    lifecycleState.decrementActiveThreadCount();
                 }
             }
         };