You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:51 UTC

[48/51] [abbrv] incubator-nifi git commit: NIFI-381: Ensure that we always properly account for number of active threads

NIFI-381: Ensure that we always properly account for number of active threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1af8c1e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1af8c1e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1af8c1e2

Branch: refs/heads/NIFI-353
Commit: 1af8c1e22a32b2e4024a655a31735be1d170b5df
Parents: ca23ad8
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 25 14:07:21 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 25 14:07:21 2015 -0500

----------------------------------------------------------------------
 .../tasks/ContinuallyRunProcessorTask.java      | 50 ++++++++++----------
 1 file changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1af8c1e2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index f4be855..cff8744 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -159,31 +159,33 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
                 procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
             }
         } finally {
-            if (batch) {
-                rawSession.commit();
-            }
-
-            final long processingNanos = System.nanoTime() - startNanos;
-
-            // if the processor is no longer scheduled to run and this is the last thread,
-            // invoke the OnStopped methods
-            if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
-                try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
-                    flowController.heartbeat();
-                }
-            }
-
-            scheduleState.decrementActiveThreadCount();
-
             try {
-                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
-                procEvent.setProcessingNanos(processingNanos);
-                procEvent.setInvocations(invocationCount);
-                context.getFlowFileEventRepository().updateRepository(procEvent);
-            } catch (final IOException e) {
-                logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
-                logger.error("", e);
+                if (batch) {
+                    rawSession.commit();
+                }
+    
+                final long processingNanos = System.nanoTime() - startNanos;
+    
+                // if the processor is no longer scheduled to run and this is the last thread,
+                // invoke the OnStopped methods
+                if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
+                        flowController.heartbeat();
+                    }
+                }
+    
+                try {
+                    final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
+                    procEvent.setProcessingNanos(processingNanos);
+                    procEvent.setInvocations(invocationCount);
+                    context.getFlowFileEventRepository().updateRepository(procEvent);
+                } catch (final IOException e) {
+                    logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
+                    logger.error("", e);
+                }
+            } finally {
+                scheduleState.decrementActiveThreadCount();
             }
         }