You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/11/13 06:17:24 UTC
[2/3] nifi git commit: NIFI-1155: Refactored StandardFlowFileQueue to
update member variables more intelligently,
using CAS operations instead of locks. This reduces code complexities because
other optimizations that previously existed are no longer need
NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer needed
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37d6b735
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37d6b735
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37d6b735
Branch: refs/heads/master
Commit: 37d6b7350e964e850685498d2232f2688f1a5afc
Parents: 3ed0949
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Nov 12 08:36:38 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Nov 13 00:06:12 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/StandardFlowFileQueue.java | 201 ++++++++-----------
.../controller/TestStandardFlowFileQueue.java | 52 +++++
2 files changed, 139 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index dd74250..5dce801 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -32,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private boolean swapMode = false;
- // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it.
- private volatile String maximumQueueDataSize;
- private volatile long maximumQueueByteCount;
- private volatile long maximumQueueObjectCount;
-
- // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
- private final AtomicLong flowFileExpirationMillis;
- private final AtomicReference<String> flowFileExpirationPeriod;
-
- // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
- private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-
- // TODO: Unit test better!
+ private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
+ private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
private final EventReporter eventReporter;
private final Connection connection;
@@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>();
- maximumQueueObjectCount = 0L;
- maximumQueueDataSize = "0 MB";
- maximumQueueByteCount = 0L;
- flowFileExpirationMillis = new AtomicLong(0);
- flowFileExpirationPeriod = new AtomicReference<>("0 mins");
swapQueue = new ArrayList<>();
this.eventReporter = eventReporter;
this.swapManager = swapManager;
@@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public void setBackPressureObjectThreshold(final long maxQueueSize) {
- writeLock.lock();
- try {
- maximumQueueObjectCount = maxQueueSize;
- this.queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("setBackPressureObjectThreshold");
+ public void setBackPressureObjectThreshold(final long threshold) {
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = maxQueueSize.get();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public long getBackPressureObjectThreshold() {
- return maximumQueueObjectCount;
+ return maxQueueSize.get().getMaxCount();
}
@Override
public void setBackPressureDataSizeThreshold(final String maxDataSize) {
- writeLock.lock();
- try {
- maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
- maximumQueueDataSize = maxDataSize;
- this.queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("setBackPressureDataSizeThreshold");
+ final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = maxQueueSize.get();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public String getBackPressureDataSizeThreshold() {
- return maximumQueueDataSize;
+ return maxQueueSize.get().getMaxSize();
}
@Override
@@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public void acknowledge(final FlowFileRecord flowFile) {
- if (queueFullRef.get()) {
- writeLock.lock();
- try {
- incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
- queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("acknowledge(FlowFileRecord)");
- }
- } else {
- incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
- }
+ incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// queue was full but no longer is. Notify that the source may now be available to run,
@@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
totalSize += flowFile.getSize();
}
- if (queueFullRef.get()) {
- writeLock.lock();
- try {
- incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
- queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("acknowledge(FlowFileRecord)");
- }
- } else {
- incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
- }
+ incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public boolean isFull() {
- return queueFullRef.get();
- }
+ final MaxQueueSize maxSize = maxQueueSize.get();
- /**
- * MUST be called with either the read or write lock held
- *
- * @return true if full
- */
- private boolean determineIfFull() {
- final long maxSize = maximumQueueObjectCount;
- final long maxBytes = maximumQueueByteCount;
- if (maxSize <= 0 && maxBytes <= 0) {
+ // Check if max size is set
+ if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
return false;
}
final QueueSize queueSize = getQueueSize();
- if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
+ if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true;
}
- if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
+ if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
return true;
}
return false;
}
+
@Override
public void put(final FlowFileRecord file) {
writeLock.lock();
@@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(1, file.getSize());
activeQueue.add(file);
}
-
- queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("put(FlowFileRecord)");
}
@@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(numFiles, bytes);
activeQueue.addAll(files);
}
-
- queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("putAll");
}
@@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
FlowFileRecord flowFile = null;
// First check if we have any records Pre-Fetched.
- final long expirationMillis = flowFileExpirationMillis.get();
+ final long expirationMillis = expirationPeriod.get().getMillis();
writeLock.lock();
try {
flowFile = doPoll(expiredRecords, expirationMillis);
@@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired;
migrateSwapToActive();
- final boolean queueFullAtStart = queueFullRef.get();
long expiredBytes = 0L;
-
do {
flowFile = this.activeQueue.poll();
@@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
}
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
-
return flowFile;
}
@@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
migrateSwapToActive();
- final boolean queueFullAtStart = queueFullRef.get();
-
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
long expiredBytes = 0L;
@@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
-
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
}
/**
@@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
long drainedSize = 0L;
FlowFileRecord pulled = null;
- final long expirationMillis = this.flowFileExpirationMillis.get();
+ final long expirationMillis = expirationPeriod.get().getMillis();
while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
expiredRecords.add(pulled);
@@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
- boolean queueFullAtStart = false;
writeLock.lock();
try {
migrateSwapToActive();
- final long expirationMillis = this.flowFileExpirationMillis.get();
- queueFullAtStart = queueFullRef.get();
+ final long expirationMillis = expirationPeriod.get().getMillis();
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>();
@@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
this.activeQueue.addAll(unselected);
+ incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
return selectedFlowFiles;
} finally {
- try {
- incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
- } finally {
- writeLock.unlock("poll(Filter, Set)");
- }
+ writeLock.unlock("poll(Filter, Set)");
}
}
@@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public String getFlowFileExpiration() {
- return flowFileExpirationPeriod.get();
+ return expirationPeriod.get().getPeriod();
}
@Override
public int getFlowFileExpiration(final TimeUnit timeUnit) {
- return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
+ return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
}
@Override
@@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (millis < 0) {
throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
}
- this.flowFileExpirationPeriod.set(flowExpirationPeriod);
- this.flowFileExpirationMillis.set(millis);
+
+ expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
}
@@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
" Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
}
}
+
+
+ private static class MaxQueueSize {
+ private final String maxSize;
+ private final long maxBytes;
+ private final long maxCount;
+
+ public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
+ this.maxSize = maxSize;
+ this.maxBytes = maxBytes;
+ this.maxCount = maxCount;
+ }
+
+ public String getMaxSize() {
+ return maxSize;
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ public long getMaxCount() {
+ return maxCount;
+ }
+
+ @Override
+ public String toString() {
+ return maxCount + " Objects/" + maxSize;
+ }
+ }
+
+ private static class TimePeriod {
+ private final String period;
+ private final long millis;
+
+ public TimePeriod(final String period, final long millis) {
+ this.period = period;
+ this.millis = millis;
+ }
+
+ public String getPeriod() {
+ return period;
+ }
+
+ public long getMillis() {
+ return millis;
+ }
+
+ @Override
+ public String toString() {
+ return period;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/37d6b735/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 61f96fd..09ac7f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
+ private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
+
@Before
+ @SuppressWarnings("unchecked")
public void setup() {
+ provRecords.clear();
+
final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue {
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
+ for (final ProvenanceEventRecord record : iterable) {
+ provRecords.add(record);
+ }
+ return null;
+ }
+ }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L);
@@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue {
assertTrue(queue.isActiveQueueEmpty());
}
+ @Test(timeout = 10000)
+ public void testBackPressureAfterDrop() throws InterruptedException {
+ queue.setBackPressureObjectThreshold(10);
+ queue.setFlowFileExpiration("10 millis");
+
+ for (int i = 0; i < 9; i++) {
+ queue.put(new TestFlowFile());
+ assertFalse(queue.isFull());
+ }
+
+ queue.put(new TestFlowFile());
+ assertTrue(queue.isFull());
+
+ Thread.sleep(100L);
+
+ final String requestId = UUID.randomUUID().toString();
+ final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
+
+ while (status.getState() != DropFlowFileState.COMPLETE) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse(queue.isFull());
+ assertTrue(queue.isEmpty());
+ assertTrue(queue.isActiveQueueEmpty());
+
+ assertEquals(10, provRecords.size());
+ for (final ProvenanceEventRecord event : provRecords) {
+ assertNotNull(event);
+ assertEquals(ProvenanceEventType.DROP, event.getEventType());
+ }
+ }
+
@Test
public void testBackPressureAfterPollSingle() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);