You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/03/02 11:22:54 UTC

[nifi] branch main updated: NIFI-9689: When checking FlowFile Availability, consider swap queue and trigger data to be swapped in, since calling poll() will no longer happen if no data is available

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 53a35ae  NIFI-9689: When checking FlowFile Availability, consider swap queue and trigger data to be swapped in, since calling poll() will no longer happen if no data is available
53a35ae is described below

commit 53a35ae4c999824b1c79c31954d2b9b719efd997
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Mar 1 16:41:09 2022 -0500

    NIFI-9689: When checking FlowFile Availability, consider swap queue and trigger data to be swapped in, since calling poll() will no longer happen if no data is available
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5821.
---
 .../controller/queue/SwappablePriorityQueue.java   | 31 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 92129a5..81e96c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -443,18 +443,45 @@ public class SwappablePriorityQueue {
 
     public FlowFileAvailability getFlowFileAvailability() {
         // If queue is empty, avoid obtaining a lock.
-        if (isActiveQueueEmpty()) {
+        final FlowFileQueueSize queueSize = getFlowFileQueueSize();
+        if (queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0) {
             return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
         }
 
-        final FlowFileRecord top;
+        boolean mustMigrateSwapToActive = false;
+        FlowFileRecord top;
         readLock.lock();
         try {
             top = activeQueue.peek();
+            if (top == null) {
+                if (swapQueue.isEmpty() && queueSize.getSwapFileCount() > 0) {
+                    // Nothing available in the active queue or swap queue, but there is data swapped out.
+                    // We need to trigger that data to be swapped back in. But to do this, we need to hold the write lock.
+                    // Because we cannot obtain the write lock while already holding the read lock, we set a flag so that we
+                    // can migrate swap to active queue only after we've released the read lock.
+                    mustMigrateSwapToActive = true;
+                } else {
+                    top = swapQueue.get(0);
+                }
+            }
         } finally {
             readLock.unlock("isFlowFileAvailable");
         }
 
+        // If we need to migrate swapped data to the active queue, we can do that now that the read lock has been released.
+        // There may well be multiple threads attempting this concurrently, though, so only use tryLock() and if the lock
+        // is not obtained, the other thread can swap data in, or the next iteration of #getFlowFileAvailability will.
+        if (mustMigrateSwapToActive) {
+            final boolean lockObtained = writeLock.tryLock();
+            if (lockObtained) {
+                try {
+                    migrateSwapToActive();
+                } finally {
+                    writeLock.unlock("getFlowFileAvailability");
+                }
+            }
+        }
+
         if (top == null) {
             return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
         }