You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 23:32:16 UTC
[1/2] nifi git commit: NIFI-108: Added unit tests;
added verifyCanList method to queue; fixed bugs
Repository: nifi
Updated Branches:
refs/heads/NIFI-108 ab30bf046 -> 6d64f58d4
NIFI-108: Added unit tests; added verifyCanList method to queue; fixed bugs
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e18038ac
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e18038ac
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e18038ac
Branch: refs/heads/NIFI-108
Commit: e18038ac2f130979fd96f557e1bdbb6d99abf42a
Parents: b12aba7
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 17:31:59 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 17:31:59 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/queue/FlowFileQueue.java | 7 ++
.../cluster/manager/impl/WebClusterManager.java | 12 +-
.../controller/queue/ListFlowFileRequest.java | 2 +-
.../nifi/controller/StandardFlowFileQueue.java | 38 +++++-
.../controller/TestStandardFlowFileQueue.java | 118 +++++++++++++++++++
5 files changed, 165 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index dbf2f04..0d0f03f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -298,4 +298,11 @@ public interface FlowFileQueue {
* @throws IOException if unable to read FlowFiles that are stored on some external device
*/
FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
+
+ /**
+ * Ensures that a listing can be performed on the queue
+ *
+ * @throws IllegalStateException if the queue is not in a state in which a listing can be performed
+ */
+ void verifyCanList() throws IllegalStateException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 210cf52..6cd95b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -2854,14 +2854,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
ListFlowFileState state = null;
- int sumOfPercents = 0;
+ int numStepsCompleted = 0;
+ int numStepsTotal = 0;
boolean finished = true;
for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) {
final ListingRequestDTO nodeRequest = entry.getValue();
- Integer percentComplete = nodeRequest.getPercentCompleted();
- if (percentComplete != null) {
- sumOfPercents += percentComplete;
- }
+
+ numStepsCompleted += nodeRequest.getCompletedStepCount();
+ numStepsTotal += nodeRequest.getTotalStepCount();
if (!nodeRequest.getFinished()) {
finished = false;
@@ -2895,7 +2895,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
listingRequest.setFlowFileSummaries(summaryDTOs);
- final int percentCompleted = sumOfPercents / listingRequestMap.size();
+ final int percentCompleted = numStepsCompleted / numStepsTotal;
listingRequest.setPercentCompleted(percentCompleted);
listingRequest.setFinished(finished);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index aad4c4f..313ad0c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -30,7 +30,7 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
private final long submissionTime = System.currentTimeMillis();
private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
- private ListFlowFileState state;
+ private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK;
private String failureReason;
private int numSteps;
private int completedStepCount;
http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index daaa763..24fd71e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -915,6 +915,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order.
writeLock.lock();
try {
+ logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this);
+ listRequest.setState(ListFlowFileState.CALCULATING_LIST);
final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size());
FlowFileRecord flowFile;
@@ -926,6 +928,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (++resultCount >= maxResults) {
+ logger.debug("{} Reached max number of results of {} from active queue; listing complete", StandardFlowFileQueue.this, maxResults);
break;
}
}
@@ -937,16 +940,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("List FlowFiles");
}
+ logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount);
+
listRequest.setCompletedStepCount(++completedStepCount);
- position = activeQueue.size();
- sourceLoop: while (resultCount < maxResults) {
- try {
+ if (summaries.size() < maxResults) {
+ position = activeQueue.size();
+ sourceLoop: try {
// We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since
// we are not modifying anything.
readLock.lock();
try {
for (final String location : swapLocations) {
+ logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location);
final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this);
for (final FlowFileRecord flowFile : flowFiles) {
position++;
@@ -954,6 +960,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (++resultCount >= maxResults) {
+ logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults);
break sourceLoop;
}
}
@@ -962,12 +969,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
listRequest.setCompletedStepCount(++completedStepCount);
}
+ logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this);
for (final FlowFileRecord flowFile : swapQueue) {
position++;
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (++resultCount >= maxResults) {
+ logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults);
break sourceLoop;
}
}
@@ -977,12 +986,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} finally {
readLock.unlock("List FlowFiles");
}
+
+ break sourceLoop;
} catch (final IOException ioe) {
logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
}
}
+ // We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have
+ // skipped some steps because we have reached the maximum number of results, so we consider those steps completed.
+ logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this);
+ listRequest.setCompletedStepCount(listRequest.getTotalStepCount());
+ listRequest.setState(ListFlowFileState.COMPLETE);
listRequest.setFlowFileSummaries(summaries);
}
}, "List FlowFiles for Connection " + getIdentifier());
@@ -1002,7 +1018,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final long size = flowFile.getSize();
- final long lastQueuedTime = flowFile.getLastQueueDate();
+ final Long lastQueuedTime = flowFile.getLastQueueDate();
final long lineageStart = flowFile.getLineageStartDate();
final boolean penalized = flowFile.isPenalized();
@@ -1029,7 +1045,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public long getLastQueuedTime() {
- return lastQueuedTime;
+ return lastQueuedTime == null ? 0L : lastQueuedTime;
}
@Override
@@ -1100,6 +1116,18 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return null;
}
+
+ @Override
+ public void verifyCanList() throws IllegalStateException {
+ if (connection.getSource().isRunning()) {
+ throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running");
+ }
+
+ if (connection.getDestination().isRunning()) {
+ throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running");
+ }
+ }
+
@Override
public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 09ac7f2..f58d4b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -39,6 +39,8 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
@@ -55,17 +57,27 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.sun.istack.logging.Logger;
+
+import ch.qos.logback.classic.BasicConfigurator;
+
public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
+ @BeforeClass
+ public static void setupLogging() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
+ }
+
@Before
@SuppressWarnings("unchecked")
public void setup() {
@@ -388,6 +400,112 @@ public class TestStandardFlowFileQueue {
assertEquals(20, swapManager.swapInCalledCount);
}
+
+ @Test(timeout = 5000)
+ public void testListFlowFilesOnlyActiveQueue() throws InterruptedException {
+ for (int i = 0; i < 9999; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000);
+ assertNotNull(status);
+ assertEquals(9999, status.getQueueSize().getObjectCount());
+
+ while (status.getState() != ListFlowFileState.COMPLETE) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(9999, status.getFlowFileSummaries().size());
+ assertEquals(100, status.getCompletionPercentage());
+ assertNull(status.getFailureReason());
+ assertEquals(2, status.getTotalStepCount());
+ assertEquals(2, status.getCompletedStepCount());
+ }
+
+ @Test(timeout = 5000)
+ public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException {
+ for (int i = 0; i < 11000; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000);
+ assertNotNull(status);
+ assertEquals(11000, status.getQueueSize().getObjectCount());
+
+ while (status.getState() != ListFlowFileState.COMPLETE) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(11000, status.getFlowFileSummaries().size());
+ assertEquals(100, status.getCompletionPercentage());
+ assertNull(status.getFailureReason());
+ assertEquals(2, status.getTotalStepCount());
+ assertEquals(2, status.getCompletedStepCount());
+ }
+
+ @Test(timeout = 5000)
+ public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException {
+ for (int i = 0; i < 20000; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000);
+ assertNotNull(status);
+ assertEquals(20000, status.getQueueSize().getObjectCount());
+
+ while (status.getState() != ListFlowFileState.COMPLETE) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(20000, status.getFlowFileSummaries().size());
+ assertEquals(100, status.getCompletionPercentage());
+ assertNull(status.getFailureReason());
+ assertEquals(3, status.getTotalStepCount());
+ assertEquals(3, status.getCompletedStepCount());
+ }
+
+ @Test(timeout = 5000)
+ public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException {
+ for (int i = 0; i < 30050; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050);
+ assertNotNull(status);
+ assertEquals(30050, status.getQueueSize().getObjectCount());
+
+ while (status.getState() != ListFlowFileState.COMPLETE) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(30050, status.getFlowFileSummaries().size());
+ assertEquals(100, status.getCompletionPercentage());
+ assertNull(status.getFailureReason());
+ assertEquals(4, status.getTotalStepCount());
+ assertEquals(4, status.getCompletedStepCount());
+ }
+
+ @Test(timeout = 5000000)
+ public void testListFlowFilesResultsLimited() throws InterruptedException {
+ for (int i = 0; i < 30050; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
+ assertNotNull(status);
+ assertEquals(30050, status.getQueueSize().getObjectCount());
+
+ while (status.getState() != ListFlowFileState.COMPLETE) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(100, status.getFlowFileSummaries().size());
+ assertEquals(100, status.getCompletionPercentage());
+ assertNull(status.getFailureReason());
+ assertEquals(4, status.getTotalStepCount());
+ assertEquals(4, status.getCompletedStepCount());
+ }
+
private class TestSwapManager implements FlowFileSwapManager {
private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
int swapOutCalledCount = 0;
[2/2] nifi git commit: Merge branch 'NIFI-108' of
https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6d64f58d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6d64f58d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6d64f58d
Branch: refs/heads/NIFI-108
Commit: 6d64f58d403d5a21f22e3dc29fb9c4fa922a6c4a
Parents: e18038a ab30bf0
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 17:32:07 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 17:32:07 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/web/api/ConnectionResource.java | 42 +++++++++++++++-----
.../org/apache/nifi/web/api/dto/DtoFactory.java | 18 +++++++++
.../web/dao/impl/StandardConnectionDAO.java | 19 +++++----
3 files changed, 62 insertions(+), 17 deletions(-)
----------------------------------------------------------------------