You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/24 18:24:35 UTC

nifi git commit: NIFI-1452 on timer-driven yield, use the greater of yield duration or run schedule

Repository: nifi
Updated Branches:
  refs/heads/master b12cf8a6d -> 232380dbf


NIFI-1452 on timer-driven yield, use the greater of yield duration or run schedule

This closes #1832.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/232380db
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/232380db
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/232380db

Branch: refs/heads/master
Commit: 232380dbfd59de45c4c6623f141d6e7052c367f9
Parents: b12cf8a
Author: Mike Moser <mo...@apache.org>
Authored: Fri May 19 19:48:10 2017 +0000
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed May 24 14:24:29 2017 -0400

----------------------------------------------------------------------
 .../controller/scheduling/TimerDrivenSchedulingAgent.java    | 8 +++++---
 .../nifi/controller/tasks/ContinuallyRunProcessorTask.java   | 5 ++++-
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index fcd901f..a82fde4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -139,8 +139,10 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
                     // If the component is yielded, cancel its future and re-submit it to run again
                     // after the yield has expired.
                     final long newYieldExpiration = connectable.getYieldExpiration();
-                    if (newYieldExpiration > System.currentTimeMillis()) {
-                        final long yieldMillis = newYieldExpiration - System.currentTimeMillis();
+                    final long now = System.currentTimeMillis();
+                    if (newYieldExpiration > now) {
+                        final long yieldMillis = newYieldExpiration - now;
+                        final long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
                         final ScheduledFuture<?> scheduledFuture = futureRef.get();
                         if (scheduledFuture == null) {
                             return;
@@ -150,7 +152,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
                         // an accurate accounting of which futures are outstanding; we must then also update the futureRef
                         // so that we can do this again the next time that the component is yielded.
                         if (scheduledFuture.cancel(false)) {
-                            final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
+                            final long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis));
 
                             synchronized (scheduleState) {
                                 if (scheduleState.isScheduled()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 01f3c8c..f2a7eee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -75,7 +75,10 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
     }
 
     static boolean isYielded(final ProcessorNode procNode) {
-        return procNode.getYieldExpiration() >= System.currentTimeMillis();
+        // after one yield period, the scheduling agent could call this again when
+        // yieldExpiration == currentTime, and we don't want that to still be considered 'yielded'
+        // so this uses ">" instead of ">="
+        return procNode.getYieldExpiration() > System.currentTimeMillis();
     }
 
     static boolean isWorkToDo(final ProcessorNode procNode) {