You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:51 UTC
[48/51] [abbrv] incubator-nifi git commit: NIFI-381: Ensure that we
always properly account for number of active threads
NIFI-381: Ensure that we always properly account for number of active threads
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1af8c1e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1af8c1e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1af8c1e2
Branch: refs/heads/NIFI-353
Commit: 1af8c1e22a32b2e4024a655a31735be1d170b5df
Parents: ca23ad8
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 25 14:07:21 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 25 14:07:21 2015 -0500
----------------------------------------------------------------------
.../tasks/ContinuallyRunProcessorTask.java | 50 ++++++++++----------
1 file changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1af8c1e2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index f4be855..cff8744 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -159,31 +159,33 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
} finally {
- if (batch) {
- rawSession.commit();
- }
-
- final long processingNanos = System.nanoTime() - startNanos;
-
- // if the processor is no longer scheduled to run and this is the last thread,
- // invoke the OnStopped methods
- if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
- flowController.heartbeat();
- }
- }
-
- scheduleState.decrementActiveThreadCount();
-
try {
- final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
- procEvent.setProcessingNanos(processingNanos);
- procEvent.setInvocations(invocationCount);
- context.getFlowFileEventRepository().updateRepository(procEvent);
- } catch (final IOException e) {
- logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
- logger.error("", e);
+ if (batch) {
+ rawSession.commit();
+ }
+
+ final long processingNanos = System.nanoTime() - startNanos;
+
+ // if the processor is no longer scheduled to run and this is the last thread,
+ // invoke the OnStopped methods
+ if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
+ flowController.heartbeat();
+ }
+ }
+
+ try {
+ final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
+ procEvent.setProcessingNanos(processingNanos);
+ procEvent.setInvocations(invocationCount);
+ context.getFlowFileEventRepository().updateRepository(procEvent);
+ } catch (final IOException e) {
+ logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
+ logger.error("", e);
+ }
+ } finally {
+ scheduleState.decrementActiveThreadCount();
}
}