You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/11/13 06:17:24 UTC

[2/3] nifi git commit: NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer need

NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer needed

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37d6b735
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37d6b735
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37d6b735

Branch: refs/heads/master
Commit: 37d6b7350e964e850685498d2232f2688f1a5afc
Parents: 3ed0949
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 08:36:38 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Nov 13 00:06:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  | 201 ++++++++-----------
 .../controller/TestStandardFlowFileQueue.java   |  52 +++++
 2 files changed, 139 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index dd74250..5dce801 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -32,8 +32,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     private boolean swapMode = false;
 
-    // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it.
-    private volatile String maximumQueueDataSize;
-    private volatile long maximumQueueByteCount;
-    private volatile long maximumQueueObjectCount;
-
-    // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
-    private final AtomicLong flowFileExpirationMillis;
-    private final AtomicReference<String> flowFileExpirationPeriod;
-
-    // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
-    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-
-    // TODO: Unit test better!
+    private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
+    private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
 
     private final EventReporter eventReporter;
     private final Connection connection;
@@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
-        maximumQueueObjectCount = 0L;
-        maximumQueueDataSize = "0 MB";
-        maximumQueueByteCount = 0L;
-        flowFileExpirationMillis = new AtomicLong(0);
-        flowFileExpirationPeriod = new AtomicReference<>("0 mins");
         swapQueue = new ArrayList<>();
         this.eventReporter = eventReporter;
         this.swapManager = swapManager;
@@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public void setBackPressureObjectThreshold(final long maxQueueSize) {
-        writeLock.lock();
-        try {
-            maximumQueueObjectCount = maxQueueSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureObjectThreshold");
+    public void setBackPressureObjectThreshold(final long threshold) {
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = maxQueueSize.get();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
         }
     }
 
     @Override
     public long getBackPressureObjectThreshold() {
-        return maximumQueueObjectCount;
+        return maxQueueSize.get().getMaxCount();
     }
 
     @Override
     public void setBackPressureDataSizeThreshold(final String maxDataSize) {
-        writeLock.lock();
-        try {
-            maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
-            maximumQueueDataSize = maxDataSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureDataSizeThreshold");
+        final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = maxQueueSize.get();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
         }
     }
 
     @Override
     public String getBackPressureDataSizeThreshold() {
-        return maximumQueueDataSize;
+        return maxQueueSize.get().getMaxSize();
     }
 
     @Override
@@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public void acknowledge(final FlowFileRecord flowFile) {
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
-        }
+        incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
 
         if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
             // queue was full but no longer is. Notify that the source may now be available to run,
@@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             totalSize += flowFile.getSize();
         }
 
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
-        }
+        incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
 
         if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
             // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public boolean isFull() {
-        return queueFullRef.get();
-    }
+        final MaxQueueSize maxSize = maxQueueSize.get();
 
-    /**
-     * MUST be called with either the read or write lock held
-     *
-     * @return true if full
-     */
-    private boolean determineIfFull() {
-        final long maxSize = maximumQueueObjectCount;
-        final long maxBytes = maximumQueueByteCount;
-        if (maxSize <= 0 && maxBytes <= 0) {
+        // Check if max size is set
+        if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
             return false;
         }
 
         final QueueSize queueSize = getQueueSize();
-        if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
+        if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
             return true;
         }
 
-        if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
+        if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
             return true;
         }
 
         return false;
     }
 
+
     @Override
     public void put(final FlowFileRecord file) {
         writeLock.lock();
@@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 incrementActiveQueueSize(1, file.getSize());
                 activeQueue.add(file);
             }
-
-            queueFullRef.set(determineIfFull());
         } finally {
             writeLock.unlock("put(FlowFileRecord)");
         }
@@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 incrementActiveQueueSize(numFiles, bytes);
                 activeQueue.addAll(files);
             }
-
-            queueFullRef.set(determineIfFull());
         } finally {
             writeLock.unlock("putAll");
         }
@@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         FlowFileRecord flowFile = null;
 
         // First check if we have any records Pre-Fetched.
-        final long expirationMillis = flowFileExpirationMillis.get();
+        final long expirationMillis = expirationPeriod.get().getMillis();
         writeLock.lock();
         try {
             flowFile = doPoll(expiredRecords, expirationMillis);
@@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         boolean isExpired;
 
         migrateSwapToActive();
-        final boolean queueFullAtStart = queueFullRef.get();
 
         long expiredBytes = 0L;
-
         do {
             flowFile = this.activeQueue.poll();
 
@@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
         }
 
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
-
         return flowFile;
     }
 
@@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
         migrateSwapToActive();
 
-        final boolean queueFullAtStart = queueFullRef.get();
-
         final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
 
         long expiredBytes = 0L;
@@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
         incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
         incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
-
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
     }
 
     /**
@@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         long drainedSize = 0L;
         FlowFileRecord pulled = null;
 
-        final long expirationMillis = this.flowFileExpirationMillis.get();
+        final long expirationMillis = expirationPeriod.get().getMillis();
         while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
             if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
                 expiredRecords.add(pulled);
@@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
         long bytesPulled = 0L;
         int flowFilesPulled = 0;
-        boolean queueFullAtStart = false;
 
         writeLock.lock();
         try {
             migrateSwapToActive();
 
-            final long expirationMillis = this.flowFileExpirationMillis.get();
-            queueFullAtStart = queueFullRef.get();
+            final long expirationMillis = expirationPeriod.get().getMillis();
 
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
@@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
 
             this.activeQueue.addAll(unselected);
+            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
 
             return selectedFlowFiles;
         } finally {
-            try {
-                incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-
-                // if at least 1 FlowFile was expired & the queue was full before we started, then
-                // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-                // then the queue will still be full until the appropriate #acknowledge method is called.
-                if (queueFullAtStart && !expiredRecords.isEmpty()) {
-                    queueFullRef.set(determineIfFull());
-                }
-            } finally {
-                writeLock.unlock("poll(Filter, Set)");
-            }
+            writeLock.unlock("poll(Filter, Set)");
         }
     }
 
@@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public String getFlowFileExpiration() {
-        return flowFileExpirationPeriod.get();
+        return expirationPeriod.get().getPeriod();
     }
 
     @Override
     public int getFlowFileExpiration(final TimeUnit timeUnit) {
-        return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
+        return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         if (millis < 0) {
             throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
         }
-        this.flowFileExpirationPeriod.set(flowExpirationPeriod);
-        this.flowFileExpirationMillis.set(millis);
+        
+        expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
     }
 
 
@@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
         }
     }
+
+
+    private static class MaxQueueSize {
+        private final String maxSize;
+        private final long maxBytes;
+        private final long maxCount;
+
+        public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
+            this.maxSize = maxSize;
+            this.maxBytes = maxBytes;
+            this.maxCount = maxCount;
+        }
+
+        public String getMaxSize() {
+            return maxSize;
+        }
+
+        public long getMaxBytes() {
+            return maxBytes;
+        }
+
+        public long getMaxCount() {
+            return maxCount;
+        }
+
+        @Override
+        public String toString() {
+            return maxCount + " Objects/" + maxSize;
+        }
+    }
+
+    private static class TimePeriod {
+        private final String period;
+        private final long millis;
+
+        public TimePeriod(final String period, final long millis) {
+            this.period = period;
+            this.millis = millis;
+        }
+
+        public String getPeriod() {
+            return period;
+        }
+
+        public long getMillis() {
+            return millis;
+        }
+
+        @Override
+        public String toString() {
+            return period;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 61f96fd..09ac7f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
+    private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
+
     @Before
+    @SuppressWarnings("unchecked")
     public void setup() {
+        provRecords.clear();
+
         final Connection connection = Mockito.mock(Connection.class);
         Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
         Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue {
         final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
 
         Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
+                for (final ProvenanceEventRecord record : iterable) {
+                    provRecords.add(record);
+                }
+                return null;
+            }
+        }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
 
         queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
         TestFlowFile.idGenerator.set(0L);
@@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue {
         assertTrue(queue.isActiveQueueEmpty());
     }
 
+    @Test(timeout = 10000)
+    public void testBackPressureAfterDrop() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final String requestId = UUID.randomUUID().toString();
+        final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
+
+        while (status.getState() != DropFlowFileState.COMPLETE) {
+            Thread.sleep(10L);
+        }
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+
+        assertEquals(10, provRecords.size());
+        for (final ProvenanceEventRecord event : provRecords) {
+            assertNotNull(event);
+            assertEquals(ProvenanceEventType.DROP, event.getEventType());
+        }
+    }
+
     @Test
     public void testBackPressureAfterPollSingle() throws InterruptedException {
         queue.setBackPressureObjectThreshold(10);