You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/11/13 06:17:23 UTC

[1/3] nifi git commit: NIFI-1155 fixed contrib-check violation

Repository: nifi
Updated Branches:
  refs/heads/master e6086420a -> 36d00a60f


NIFI-1155 fixed contrib-check violation


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/36d00a60
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/36d00a60
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/36d00a60

Branch: refs/heads/master
Commit: 36d00a60f51d123d1109afedc7c49875a940f7d2
Parents: 37d6b73
Author: joewitt <jo...@apache.org>
Authored: Thu Nov 12 23:01:56 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Nov 13 00:06:12 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/StandardFlowFileQueue.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/36d00a60/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 5dce801..9a439c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -764,7 +764,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         if (millis < 0) {
             throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
         }
-        
+
         expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
     }
 


[2/3] nifi git commit: NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer need

Posted by jo...@apache.org.
NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer needed

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37d6b735
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37d6b735
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37d6b735

Branch: refs/heads/master
Commit: 37d6b7350e964e850685498d2232f2688f1a5afc
Parents: 3ed0949
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 08:36:38 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Nov 13 00:06:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  | 201 ++++++++-----------
 .../controller/TestStandardFlowFileQueue.java   |  52 +++++
 2 files changed, 139 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index dd74250..5dce801 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -32,8 +32,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     private boolean swapMode = false;
 
-    // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it.
-    private volatile String maximumQueueDataSize;
-    private volatile long maximumQueueByteCount;
-    private volatile long maximumQueueObjectCount;
-
-    // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
-    private final AtomicLong flowFileExpirationMillis;
-    private final AtomicReference<String> flowFileExpirationPeriod;
-
-    // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
-    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-
-    // TODO: Unit test better!
+    private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
+    private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
 
     private final EventReporter eventReporter;
     private final Connection connection;
@@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
-        maximumQueueObjectCount = 0L;
-        maximumQueueDataSize = "0 MB";
-        maximumQueueByteCount = 0L;
-        flowFileExpirationMillis = new AtomicLong(0);
-        flowFileExpirationPeriod = new AtomicReference<>("0 mins");
         swapQueue = new ArrayList<>();
         this.eventReporter = eventReporter;
         this.swapManager = swapManager;
@@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public void setBackPressureObjectThreshold(final long maxQueueSize) {
-        writeLock.lock();
-        try {
-            maximumQueueObjectCount = maxQueueSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureObjectThreshold");
+    public void setBackPressureObjectThreshold(final long threshold) {
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = maxQueueSize.get();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
         }
     }
 
     @Override
     public long getBackPressureObjectThreshold() {
-        return maximumQueueObjectCount;
+        return maxQueueSize.get().getMaxCount();
     }
 
     @Override
     public void setBackPressureDataSizeThreshold(final String maxDataSize) {
-        writeLock.lock();
-        try {
-            maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
-            maximumQueueDataSize = maxDataSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureDataSizeThreshold");
+        final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = maxQueueSize.get();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
         }
     }
 
     @Override
     public String getBackPressureDataSizeThreshold() {
-        return maximumQueueDataSize;
+        return maxQueueSize.get().getMaxSize();
     }
 
     @Override
@@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public void acknowledge(final FlowFileRecord flowFile) {
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
-        }
+        incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
 
         if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
             // queue was full but no longer is. Notify that the source may now be available to run,
@@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             totalSize += flowFile.getSize();
         }
 
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
-        }
+        incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
 
         if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
             // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public boolean isFull() {
-        return queueFullRef.get();
-    }
+        final MaxQueueSize maxSize = maxQueueSize.get();
 
-    /**
-     * MUST be called with either the read or write lock held
-     *
-     * @return true if full
-     */
-    private boolean determineIfFull() {
-        final long maxSize = maximumQueueObjectCount;
-        final long maxBytes = maximumQueueByteCount;
-        if (maxSize <= 0 && maxBytes <= 0) {
+        // Check if max size is set
+        if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
             return false;
         }
 
         final QueueSize queueSize = getQueueSize();
-        if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
+        if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
             return true;
         }
 
-        if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
+        if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
             return true;
         }
 
         return false;
     }
 
+
     @Override
     public void put(final FlowFileRecord file) {
         writeLock.lock();
@@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 incrementActiveQueueSize(1, file.getSize());
                 activeQueue.add(file);
             }
-
-            queueFullRef.set(determineIfFull());
         } finally {
             writeLock.unlock("put(FlowFileRecord)");
         }
@@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 incrementActiveQueueSize(numFiles, bytes);
                 activeQueue.addAll(files);
             }
-
-            queueFullRef.set(determineIfFull());
         } finally {
             writeLock.unlock("putAll");
         }
@@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         FlowFileRecord flowFile = null;
 
         // First check if we have any records Pre-Fetched.
-        final long expirationMillis = flowFileExpirationMillis.get();
+        final long expirationMillis = expirationPeriod.get().getMillis();
         writeLock.lock();
         try {
             flowFile = doPoll(expiredRecords, expirationMillis);
@@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         boolean isExpired;
 
         migrateSwapToActive();
-        final boolean queueFullAtStart = queueFullRef.get();
 
         long expiredBytes = 0L;
-
         do {
             flowFile = this.activeQueue.poll();
 
@@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
         }
 
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
-
         return flowFile;
     }
 
@@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
         migrateSwapToActive();
 
-        final boolean queueFullAtStart = queueFullRef.get();
-
         final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
 
         long expiredBytes = 0L;
@@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
         incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
         incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
-
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
     }
 
     /**
@@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         long drainedSize = 0L;
         FlowFileRecord pulled = null;
 
-        final long expirationMillis = this.flowFileExpirationMillis.get();
+        final long expirationMillis = expirationPeriod.get().getMillis();
         while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
             if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
                 expiredRecords.add(pulled);
@@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
         long bytesPulled = 0L;
         int flowFilesPulled = 0;
-        boolean queueFullAtStart = false;
 
         writeLock.lock();
         try {
             migrateSwapToActive();
 
-            final long expirationMillis = this.flowFileExpirationMillis.get();
-            queueFullAtStart = queueFullRef.get();
+            final long expirationMillis = expirationPeriod.get().getMillis();
 
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
@@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
 
             this.activeQueue.addAll(unselected);
+            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
 
             return selectedFlowFiles;
         } finally {
-            try {
-                incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-
-                // if at least 1 FlowFile was expired & the queue was full before we started, then
-                // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-                // then the queue will still be full until the appropriate #acknowledge method is called.
-                if (queueFullAtStart && !expiredRecords.isEmpty()) {
-                    queueFullRef.set(determineIfFull());
-                }
-            } finally {
-                writeLock.unlock("poll(Filter, Set)");
-            }
+            writeLock.unlock("poll(Filter, Set)");
         }
     }
 
@@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public String getFlowFileExpiration() {
-        return flowFileExpirationPeriod.get();
+        return expirationPeriod.get().getPeriod();
     }
 
     @Override
     public int getFlowFileExpiration(final TimeUnit timeUnit) {
-        return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
+        return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         if (millis < 0) {
             throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
         }
-        this.flowFileExpirationPeriod.set(flowExpirationPeriod);
-        this.flowFileExpirationMillis.set(millis);
+        
+        expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
     }
 
 
@@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
         }
     }
+
+
+    private static class MaxQueueSize {
+        private final String maxSize;
+        private final long maxBytes;
+        private final long maxCount;
+
+        public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
+            this.maxSize = maxSize;
+            this.maxBytes = maxBytes;
+            this.maxCount = maxCount;
+        }
+
+        public String getMaxSize() {
+            return maxSize;
+        }
+
+        public long getMaxBytes() {
+            return maxBytes;
+        }
+
+        public long getMaxCount() {
+            return maxCount;
+        }
+
+        @Override
+        public String toString() {
+            return maxCount + " Objects/" + maxSize;
+        }
+    }
+
+    private static class TimePeriod {
+        private final String period;
+        private final long millis;
+
+        public TimePeriod(final String period, final long millis) {
+            this.period = period;
+            this.millis = millis;
+        }
+
+        public String getPeriod() {
+            return period;
+        }
+
+        public long getMillis() {
+            return millis;
+        }
+
+        @Override
+        public String toString() {
+            return period;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 61f96fd..09ac7f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
+    private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
+
     @Before
+    @SuppressWarnings("unchecked")
     public void setup() {
+        provRecords.clear();
+
         final Connection connection = Mockito.mock(Connection.class);
         Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
         Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue {
         final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
 
         Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
+                for (final ProvenanceEventRecord record : iterable) {
+                    provRecords.add(record);
+                }
+                return null;
+            }
+        }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
 
         queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
         TestFlowFile.idGenerator.set(0L);
@@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue {
         assertTrue(queue.isActiveQueueEmpty());
     }
 
+    @Test(timeout = 10000)
+    public void testBackPressureAfterDrop() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final String requestId = UUID.randomUUID().toString();
+        final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
+
+        while (status.getState() != DropFlowFileState.COMPLETE) {
+            Thread.sleep(10L);
+        }
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+
+        assertEquals(10, provRecords.size());
+        for (final ProvenanceEventRecord event : provRecords) {
+            assertNotNull(event);
+            assertEquals(ProvenanceEventType.DROP, event.getEventType());
+        }
+    }
+
     @Test
     public void testBackPressureAfterPollSingle() throws InterruptedException {
         queue.setBackPressureObjectThreshold(10);


[3/3] nifi git commit: NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly update the indicator for whether or not queue is full

Posted by jo...@apache.org.
NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly update the indicator for whether or not queue is full

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3ed0949c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3ed0949c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3ed0949c

Branch: refs/heads/master
Commit: 3ed0949c5578f379cab7b90ff32778bf6296404a
Parents: e608642
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 07:48:57 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Nov 13 00:06:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  |  41 ++++---
 .../controller/TestStandardFlowFileQueue.java   | 123 +++++++++++++++++++
 2 files changed, 150 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3ed0949c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index ae991c8..dd74250 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -80,19 +80,30 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
 
     private PriorityQueue<FlowFileRecord> activeQueue = null;
+
+    // guarded by lock
     private ArrayList<FlowFileRecord> swapQueue = null;
 
     private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
 
     private boolean swapMode = false;
+
+    // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it.
     private volatile String maximumQueueDataSize;
     private volatile long maximumQueueByteCount;
     private volatile long maximumQueueObjectCount;
 
-    private final EventReporter eventReporter;
+    // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
     private final AtomicLong flowFileExpirationMillis;
-    private final Connection connection;
     private final AtomicReference<String> flowFileExpirationPeriod;
+
+    // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
+    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
+
+    // TODO: Unit test better!
+
+    private final EventReporter eventReporter;
+    private final Connection connection;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
@@ -106,8 +117,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
 
-    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
@@ -683,13 +692,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
         long bytesPulled = 0L;
         int flowFilesPulled = 0;
+        boolean queueFullAtStart = false;
 
         writeLock.lock();
         try {
             migrateSwapToActive();
 
             final long expirationMillis = this.flowFileExpirationMillis.get();
-            final boolean queueFullAtStart = queueFullRef.get();
+            queueFullAtStart = queueFullRef.get();
 
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
@@ -735,17 +745,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
             this.activeQueue.addAll(unselected);
 
-            // if at least 1 FlowFile was expired & the queue was full before we started, then
-            // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-            // then the queue will still be full until the appropriate #acknowledge method is called.
-            if (queueFullAtStart && !expiredRecords.isEmpty()) {
-                queueFullRef.set(determineIfFull());
-            }
-
             return selectedFlowFiles;
         } finally {
-            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-            writeLock.unlock("poll(Filter, Set)");
+            try {
+                incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
+
+                // if at least 1 FlowFile was expired & the queue was full before we started, then
+                // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+                // then the queue will still be full until the appropriate #acknowledge method is called.
+                if (queueFullAtStart && !expiredRecords.isEmpty()) {
+                    queueFullRef.set(determineIfFull());
+                }
+            } finally {
+                writeLock.unlock("poll(Filter, Set)");
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ed0949c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 8b8c678..61f96fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.controller;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -48,6 +49,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
@@ -107,6 +109,127 @@ public class TestStandardFlowFileQueue {
     }
 
     @Test
+    public void testBackPressure() {
+        queue.setBackPressureObjectThreshold(10);
+
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertFalse(queue.isFull());
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+            assertFalse(queue.isEmpty());
+            assertFalse(queue.isActiveQueueEmpty());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords);
+        assertNotNull(polled);
+        assertTrue(expiredRecords.isEmpty());
+
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        // queue is still full because FlowFile has not yet been acknowledged.
+        assertTrue(queue.isFull());
+        queue.acknowledge(polled);
+
+        // FlowFile has been acknowledged; queue should no longer be full.
+        assertFalse(queue.isFull());
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollFilter() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+
+        final FlowFileFilter filter = new FlowFileFilter() {
+            @Override
+            public FlowFileFilterResult filter(final FlowFile flowFile) {
+                return FlowFileFilterResult.REJECT_AND_CONTINUE;
+            }
+        };
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);
+        assertTrue(polled.isEmpty());
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollSingle() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords);
+        assertNull(polled);
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollMultiple() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final List<FlowFileRecord> polled = queue.poll(10, expiredRecords);
+        assertTrue(polled.isEmpty());
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
     public void testSwapOutOccurs() {
         for (int i = 0; i < 10000; i++) {
             queue.put(new TestFlowFile());