You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/09/14 14:22:16 UTC

nifi git commit: NIFI-4379: Event-Driven threads are constantly active, polling a queue to see if there is any work to do. Instead of getting number of active threads from the ScheduledExecutor for these threads, updated code to use an AtomicInteger that

Repository: nifi
Updated Branches:
  refs/heads/master 1e70e2426 -> e01d59a46


NIFI-4379: Event-Driven threads are constantly active, polling a queue to see if there is any work to do. Instead of getting number of active threads from the ScheduledExecutor for these threads, updated code to use an AtomicInteger that is incremented when there's work to do and decremented once complete

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2153.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e01d59a4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e01d59a4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e01d59a4

Branch: refs/heads/master
Commit: e01d59a462e3047ccd8bbfdc608631bfd90db533
Parents: 1e70e24
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Sep 13 11:01:07 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Sep 14 15:39:32 2017 +0200

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  |   9 +-
 .../scheduling/EventDrivenSchedulingAgent.java  | 184 ++++++++++---------
 2 files changed, 104 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e01d59a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 9c181ff..5d65f89 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -280,6 +280,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final AtomicInteger maxEventDrivenThreads;
     private final AtomicReference<FlowEngine> timerDrivenEngineRef;
     private final AtomicReference<FlowEngine> eventDrivenEngineRef;
+    private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent;
 
     private final ContentRepository contentRepository;
     private final FlowFileRepository flowFileRepository;
@@ -502,8 +503,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
-        processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
-            eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
+
+        eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
+            eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor);
+        processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties);
@@ -3582,7 +3585,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     public int getActiveThreadCount() {
         final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
-        final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount();
+        final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount();
         return timerDrivenCount + eventDrivenCount;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e01d59a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 58d25bb..7b2e966 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -60,6 +60,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private final EventDrivenWorkerQueue workerQueue;
     private final ProcessContextFactory contextFactory;
     private final AtomicInteger maxThreadCount;
+    private final AtomicInteger activeThreadCount = new AtomicInteger(0);
     private final StringEncryptor encryptor;
 
     private volatile String adminYieldDuration = "1 sec";
@@ -78,11 +79,15 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
         this.encryptor = encryptor;
 
         for (int i = 0; i < maxThreadCount; i++) {
-            final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
+            final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
             flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
         }
     }
 
+    public int getActiveThreadCount() {
+        return activeThreadCount.get();
+    }
+
     private StateManager getStateManager(final String componentId) {
         return stateManagerProvider.getStateManager(componentId);
     }
@@ -127,7 +132,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
             // if more threads have been allocated, add more tasks to the work queue
             final int tasksToAdd = maxThreadCount - oldMax;
             for (int i = 0; i < tasksToAdd; i++) {
-                final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
+                final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
                 flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
             }
         }
@@ -151,9 +156,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
     private class EventDrivenTask implements Runnable {
 
         private final EventDrivenWorkerQueue workerQueue;
+        private final AtomicInteger activeThreadCount;
 
-        public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) {
+        public EventDrivenTask(final EventDrivenWorkerQueue workerQueue, final AtomicInteger activeThreadCount) {
             this.workerQueue = workerQueue;
+            this.activeThreadCount = activeThreadCount;
         }
 
         @Override
@@ -170,101 +177,106 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
                     continue;
                 }
 
-                // get the connection index for this worker
-                AtomicLong connectionIndex = connectionIndexMap.get(connectable);
-                if (connectionIndex == null) {
-                    connectionIndex = new AtomicLong(0L);
-                    final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
-                    if (existingConnectionIndex != null) {
-                        connectionIndex = existingConnectionIndex;
-                    }
-                }
-
-                final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex);
-
-                if (connectable instanceof ProcessorNode) {
-                    final ProcessorNode procNode = (ProcessorNode) connectable;
-                    final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
-
-                    final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
-                    final ProcessSessionFactory sessionFactory;
-                    final StandardProcessSession rawSession;
-                    final boolean batch;
-                    if (procNode.isHighThroughputSupported() && runNanos > 0L) {
-                        rawSession = new StandardProcessSession(context);
-                        sessionFactory = new BatchingSessionFactory(rawSession);
-                        batch = true;
-                    } else {
-                        rawSession = null;
-                        sessionFactory = new StandardProcessSessionFactory(context);
-                        batch = false;
+                activeThreadCount.incrementAndGet();
+                try {
+                    // get the connection index for this worker
+                    AtomicLong connectionIndex = connectionIndexMap.get(connectable);
+                    if (connectionIndex == null) {
+                        connectionIndex = new AtomicLong(0L);
+                        final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
+                        if (existingConnectionIndex != null) {
+                            connectionIndex = existingConnectionIndex;
+                        }
                     }
 
-                    final long startNanos = System.nanoTime();
-                    final long finishNanos = startNanos + runNanos;
-                    int invocationCount = 0;
-                    boolean shouldRun = true;
+                    final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex);
+
+                    if (connectable instanceof ProcessorNode) {
+                        final ProcessorNode procNode = (ProcessorNode) connectable;
+                        final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
+
+                        final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
+                        final ProcessSessionFactory sessionFactory;
+                        final StandardProcessSession rawSession;
+                        final boolean batch;
+                        if (procNode.isHighThroughputSupported() && runNanos > 0L) {
+                            rawSession = new StandardProcessSession(context);
+                            sessionFactory = new BatchingSessionFactory(rawSession);
+                            batch = true;
+                        } else {
+                            rawSession = null;
+                            sessionFactory = new StandardProcessSessionFactory(context);
+                            batch = false;
+                        }
 
-                    try {
-                        while (shouldRun) {
-                            trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory);
-                            invocationCount++;
+                        final long startNanos = System.nanoTime();
+                        final long finishNanos = startNanos + runNanos;
+                        int invocationCount = 0;
+                        boolean shouldRun = true;
 
-                            if (!batch) {
-                                break;
-                            }
-                            if (System.nanoTime() > finishNanos) {
-                                break;
-                            }
-                            if (!scheduleState.isScheduled()) {
-                                break;
+                        try {
+                            while (shouldRun) {
+                                trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory);
+                                invocationCount++;
+
+                                if (!batch) {
+                                    break;
+                                }
+                                if (System.nanoTime() > finishNanos) {
+                                    break;
+                                }
+                                if (!scheduleState.isScheduled()) {
+                                    break;
+                                }
+
+                                final int eventCount = worker.decrementEventCount();
+                                if (eventCount < 0) {
+                                    worker.incrementEventCount();
+                                }
+                                shouldRun = (eventCount > 0);
                             }
-
-                            final int eventCount = worker.decrementEventCount();
-                            if (eventCount < 0) {
-                                worker.incrementEventCount();
+                        } finally {
+                            if (batch && rawSession != null) {
+                                try {
+                                    rawSession.commit();
+                                } catch (final RuntimeException re) {
+                                    logger.error("Unable to commit process session", re);
+                                }
                             }
-                            shouldRun = (eventCount > 0);
-                        }
-                    } finally {
-                        if (batch && rawSession != null) {
                             try {
-                                rawSession.commit();
-                            } catch (final RuntimeException re) {
-                                logger.error("Unable to commit process session", re);
+                                final long processingNanos = System.nanoTime() - startNanos;
+                                final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+                                procEvent.setProcessingNanos(processingNanos);
+                                procEvent.setInvocations(invocationCount);
+                                context.getFlowFileEventRepository().updateRepository(procEvent);
+                            } catch (final IOException e) {
+                                logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
+                                logger.error("", e);
                             }
                         }
-                        try {
-                            final long processingNanos = System.nanoTime() - startNanos;
-                            final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
-                            procEvent.setProcessingNanos(processingNanos);
-                            procEvent.setInvocations(invocationCount);
-                            context.getFlowFileEventRepository().updateRepository(procEvent);
-                        } catch (final IOException e) {
-                            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
-                            logger.error("", e);
+
+                        // If the Processor has FlowFiles, go ahead and register it to run again.
+                        // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
+                        // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
+                        // off of its input queue.
+                        // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
+                        // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
+                        // point is to register the Processor to run again.
+                        if (Connectables.flowFilesQueued(procNode)) {
+                            onEvent(procNode);
                         }
-                    }
+                    } else {
+                        final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
+                        final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
+                        trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
 
-                    // If the Processor has FlowFiles, go ahead and register it to run again.
-                    // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
-                    // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
-                    // off of its input queue.
-                    // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
-                    // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
-                    // point is to register the Processor to run again.
-                    if (Connectables.flowFilesQueued(procNode)) {
-                        onEvent(procNode);
-                    }
-                } else {
-                    final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
-                    final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
-                    trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
-
-                    // See explanation above for the ProcessorNode as to why we do this.
-                    if (Connectables.flowFilesQueued(connectable)) {
-                        onEvent(connectable);
+                        // See explanation above for the ProcessorNode as to why we do this.
+                        if (Connectables.flowFilesQueued(connectable)) {
+                            onEvent(connectable);
+                        }
                     }
+                } finally {
+                    activeThreadCount.decrementAndGet();
                 }
             }
         }