You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/03/12 19:40:02 UTC

[nifi] 10/21: NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping indefinitely and instead transfer no more than 10, 000 FlowFiles before returning from onTrigger. Updated Local Ports to behavior in the same way. Updated Root Group Ports so that instead of blocking for up to 100 milliseconds for an incoming request, it blocks for up to 1 millisecond and if nothing is available yields for the 'bored yield duration'

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.9.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b3cc626c92bd3112f86a9ee046380c1744210cb8
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 22 09:39:56 2019 -0500

    NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping indefinitely and instead transfer no more than 10,000 FlowFiles before returning from onTrigger. Updated Local Ports to behavior in the same way. Updated Root Group Ports so that instead of blocking for up to 100 milliseconds for an incoming request, it blocks for up to 1 millisecond and if nothing is available yields for the 'bored yield duration'
    
    Signed-off-by: Brandon Devries <de...@apache.org>
    
    This closes #3328.
---
 .../org/apache/nifi/controller/StandardFunnel.java | 11 ++++++-
 .../org/apache/nifi/connectable/LocalPort.java     | 38 +++++++++++++++-------
 .../apache/nifi/remote/StandardRootGroupPort.java  |  5 +--
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 96008a3..231ec42 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -370,14 +370,23 @@ public class StandardFunnel implements Funnel {
         readLock.lock();
         try {
             Set<Relationship> available = context.getAvailableRelationships();
+            int iterations = 0;
             while (!available.isEmpty()) {
-                final List<FlowFile> flowFiles = session.get(100);
+                final List<FlowFile> flowFiles = session.get(1000);
                 if (flowFiles.isEmpty()) {
                     break;
                 }
 
                 session.transfer(flowFiles, Relationship.ANONYMOUS);
                 session.commit();
+
+                // If there are fewer than 1,000 FlowFiles available to transfer, or if we
+                // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from
+                // holding the Timer-Driven Thread for an excessive amount of time.
+                if (flowFiles.size() < 1000 || ++iterations >= 10) {
+                    break;
+                }
+
                 available = context.getAvailableRelationships();
             }
         } finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
index fcf6b2d..f4baa16 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.connectable;
 
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
@@ -36,6 +26,14 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Provides a mechanism by which <code>FlowFile</code>s can be transferred into and out of a <code>ProcessGroup</code> to and/or from another <code>ProcessGroup</code> within the same instance of
  * NiFi.
@@ -73,9 +71,25 @@ public class LocalPort extends AbstractPort {
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         readLock.lock();
         try {
-            final List<FlowFile> flowFiles = session.get(100);
-            if (!flowFiles.isEmpty()) {
+            Set<Relationship> available = context.getAvailableRelationships();
+            int iterations = 0;
+            while (!available.isEmpty()) {
+                final List<FlowFile> flowFiles = session.get(1000);
+                if (flowFiles.isEmpty()) {
+                    break;
+                }
+
                 session.transfer(flowFiles, Relationship.ANONYMOUS);
+                session.commit();
+
+                // If there are fewer than 1,000 FlowFiles available to transfer, or if we
+                // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from
+                // holding the Timer-Driven Thread for an excessive amount of time.
+                if (flowFiles.size() < 1000 || ++iterations >= 10) {
+                    break;
+                }
+
+                available = context.getAvailableRelationships();
             }
         } finally {
             readLock.unlock();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 27f9d9c..b418579 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -110,7 +110,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
         this.identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
         this.bulletinRepository = bulletinRepository;
         this.scheduler = scheduler;
-        setYieldPeriod("100 millis");
+        setYieldPeriod(nifiProperties.getBoredYieldDuration());
         eventReporter = new EventReporter() {
             private static final long serialVersionUID = 1L;
 
@@ -142,12 +142,13 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
         final FlowFileRequest flowFileRequest;
         try {
-            flowFileRequest = requestQueue.poll(100, TimeUnit.MILLISECONDS);
+            flowFileRequest = requestQueue.poll(1, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException ie) {
             return;
         }
 
         if (flowFileRequest == null) {
+            context.yield();
             return;
         }