You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/10 17:57:36 UTC
incubator-nifi git commit: NIFI-54: When incrementing active thread
count, if the value exceeds max, do not run
Repository: incubator-nifi
Updated Branches:
refs/heads/develop 8254b7543 -> 97f8ab0cc
NIFI-54: When incrementing active thread count, if the value exceeds max, do not run
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/97f8ab0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/97f8ab0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/97f8ab0c
Branch: refs/heads/develop
Commit: 97f8ab0cc50f77a2ff3a4e9d575ec05c185e5b80
Parents: 8254b75
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 10 11:57:33 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Dec 10 11:57:33 2014 -0500
----------------------------------------------------------------------
.../scheduling/EventDrivenSchedulingAgent.java | 24 ++++++++++++++++++--
.../controller/scheduling/ScheduleState.java | 8 +++----
2 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 5b237ff..af801bb 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -263,7 +263,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
}
private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
- scheduleState.incrementActiveThreadCount();
+ final int newThreadCount = scheduleState.incrementActiveThreadCount();
+ if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+ // its possible that the worker queue could give us a worker node that is eligible to run based
+ // on the number of threads but another thread has already incremented the thread count, result in
+ // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+ // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+ // result in using more than the maximum number of defined threads
+ scheduleState.decrementActiveThreadCount();
+ return;
+ }
+
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory);
@@ -293,7 +303,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
}
private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
- scheduleState.incrementActiveThreadCount();
+ final int newThreadCount = scheduleState.incrementActiveThreadCount();
+ if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+ // its possible that the worker queue could give us a worker node that is eligible to run based
+ // on the number of threads but another thread has already incremented the thread count, result in
+ // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+ // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+ // result in using more than the maximum number of defined threads
+ scheduleState.decrementActiveThreadCount();
+ return;
+ }
+
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index c10de83..eb5a437 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -31,12 +31,12 @@ public class ScheduleState {
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1;
- public void incrementActiveThreadCount() {
- activeThreadCount.incrementAndGet();
+ public int incrementActiveThreadCount() {
+ return activeThreadCount.incrementAndGet();
}
- public void decrementActiveThreadCount() {
- activeThreadCount.decrementAndGet();
+ public int decrementActiveThreadCount() {
+ return activeThreadCount.decrementAndGet();
}
public int getActiveThreadCount() {