You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/11 19:41:15 UTC

[13/25] incubator-nifi git commit: NIFI-54: When incrementing active thread count, if the value exceeds max, do not run

NIFI-54: When incrementing active thread count, if the value exceeds max, do not run


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/97f8ab0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/97f8ab0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/97f8ab0c

Branch: refs/heads/bootstrap
Commit: 97f8ab0cc50f77a2ff3a4e9d575ec05c185e5b80
Parents: 8254b75
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 10 11:57:33 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Dec 10 11:57:33 2014 -0500

----------------------------------------------------------------------
 .../scheduling/EventDrivenSchedulingAgent.java  | 24 ++++++++++++++++++--
 .../controller/scheduling/ScheduleState.java    |  8 +++----
 2 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 5b237ff..af801bb 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -263,7 +263,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
         }
 
         private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
-            scheduleState.incrementActiveThreadCount();
+            final int newThreadCount = scheduleState.incrementActiveThreadCount();
+            if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+                 // its possible that the worker queue could give us a worker node that is eligible to run based
+                 // on the number of threads but another thread has already incremented the thread count, result in
+                 // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
+                 // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                 // result in using more than the maximum number of defined threads
+                 scheduleState.decrementActiveThreadCount();
+                 return;
+            }
+            
             try {
                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
                     worker.onTrigger(processContext, sessionFactory);
@@ -293,7 +303,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
         }
 
         private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
-            scheduleState.incrementActiveThreadCount();
+            final int newThreadCount = scheduleState.incrementActiveThreadCount();
+            if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+                 // its possible that the worker queue could give us a worker node that is eligible to run based
+                 // on the number of threads but another thread has already incremented the thread count, result in
+                 // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
+                 // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                 // result in using more than the maximum number of defined threads
+                 scheduleState.decrementActiveThreadCount();
+                 return;
+            }
+            
             try {
                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
                     worker.onTrigger(processContext, sessionFactory);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index c10de83..eb5a437 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -31,12 +31,12 @@ public class ScheduleState {
     private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
     private volatile long lastStopTime = -1;
 
-    public void incrementActiveThreadCount() {
-        activeThreadCount.incrementAndGet();
+    public int incrementActiveThreadCount() {
+        return activeThreadCount.incrementAndGet();
     }
 
-    public void decrementActiveThreadCount() {
-        activeThreadCount.decrementAndGet();
+    public int decrementActiveThreadCount() {
+        return activeThreadCount.decrementAndGet();
     }
 
     public int getActiveThreadCount() {