You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 23:32:16 UTC

[1/2] nifi git commit: NIFI-108: Added unit tests; added verifyCanList method to queue; fixed bugs

Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 ab30bf046 -> 6d64f58d4


NIFI-108: Added unit tests; added verifyCanList method to queue; fixed bugs


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e18038ac
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e18038ac
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e18038ac

Branch: refs/heads/NIFI-108
Commit: e18038ac2f130979fd96f557e1bdbb6d99abf42a
Parents: b12aba7
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 17:31:59 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 17:31:59 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java    |   7 ++
 .../cluster/manager/impl/WebClusterManager.java |  12 +-
 .../controller/queue/ListFlowFileRequest.java   |   2 +-
 .../nifi/controller/StandardFlowFileQueue.java  |  38 +++++-
 .../controller/TestStandardFlowFileQueue.java   | 118 +++++++++++++++++++
 5 files changed, 165 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index dbf2f04..0d0f03f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -298,4 +298,11 @@ public interface FlowFileQueue {
      * @throws IOException if unable to read FlowFiles that are stored on some external device
      */
     FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
+
+    /**
+     * Ensures that a listing can be performed on the queue
+     *
+     * @throws IllegalStateException if the queue is not in a state in which a listing can be performed
+     */
+    void verifyCanList() throws IllegalStateException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 210cf52..6cd95b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -2854,14 +2854,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
 
         ListFlowFileState state = null;
-        int sumOfPercents = 0;
+        int numStepsCompleted = 0;
+        int numStepsTotal = 0;
         boolean finished = true;
         for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) {
             final ListingRequestDTO nodeRequest = entry.getValue();
-            Integer percentComplete = nodeRequest.getPercentCompleted();
-            if (percentComplete != null) {
-                sumOfPercents += percentComplete;
-            }
+
+            numStepsCompleted += nodeRequest.getCompletedStepCount();
+            numStepsTotal += nodeRequest.getTotalStepCount();
 
             if (!nodeRequest.getFinished()) {
                 finished = false;
@@ -2895,7 +2895,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
         listingRequest.setFlowFileSummaries(summaryDTOs);
 
-        final int percentCompleted = sumOfPercents / listingRequestMap.size();
+        final int percentCompleted = numStepsCompleted / numStepsTotal;
         listingRequest.setPercentCompleted(percentCompleted);
         listingRequest.setFinished(finished);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index aad4c4f..313ad0c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -30,7 +30,7 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
     private final long submissionTime = System.currentTimeMillis();
     private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
 
-    private ListFlowFileState state;
+    private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK;
     private String failureReason;
     private int numSteps;
     private int completedStepCount;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index daaa763..24fd71e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -915,6 +915,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order.
                 writeLock.lock();
                 try {
+                    logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this);
+                    listRequest.setState(ListFlowFileState.CALCULATING_LIST);
                     final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size());
 
                     FlowFileRecord flowFile;
@@ -926,6 +928,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                             if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
                                 summaries.add(summarize(flowFile, position));
                                 if (++resultCount >= maxResults) {
+                                    logger.debug("{} Reached max number of results of {} from active queue; listing complete", StandardFlowFileQueue.this, maxResults);
                                     break;
                                 }
                             }
@@ -937,16 +940,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                     writeLock.unlock("List FlowFiles");
                 }
 
+                logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount);
+
                 listRequest.setCompletedStepCount(++completedStepCount);
 
-                position = activeQueue.size();
-                sourceLoop: while (resultCount < maxResults) {
-                    try {
+                if (summaries.size() < maxResults) {
+                    position = activeQueue.size();
+                    sourceLoop: try {
                         // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since
                         // we are not modifying anything.
                         readLock.lock();
                         try {
                             for (final String location : swapLocations) {
+                                logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location);
                                 final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this);
                                 for (final FlowFileRecord flowFile : flowFiles) {
                                     position++;
@@ -954,6 +960,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                     if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
                                         summaries.add(summarize(flowFile, position));
                                         if (++resultCount >= maxResults) {
+                                            logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults);
                                             break sourceLoop;
                                         }
                                     }
@@ -962,12 +969,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                 listRequest.setCompletedStepCount(++completedStepCount);
                             }
 
+                            logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this);
                             for (final FlowFileRecord flowFile : swapQueue) {
                                 position++;
 
                                 if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
                                     summaries.add(summarize(flowFile, position));
                                     if (++resultCount >= maxResults) {
+                                        logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults);
                                         break sourceLoop;
                                     }
                                 }
@@ -977,12 +986,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         } finally {
                             readLock.unlock("List FlowFiles");
                         }
+
+                        break sourceLoop;
                     } catch (final IOException ioe) {
                         logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
                         listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
                     }
                 }
 
+                // We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have
+                // skipped some steps because we have reached the maximum number of results, so we consider those steps completed.
+                logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this);
+                listRequest.setCompletedStepCount(listRequest.getTotalStepCount());
+                listRequest.setState(ListFlowFileState.COMPLETE);
                 listRequest.setFlowFileSummaries(summaries);
             }
         }, "List FlowFiles for Connection " + getIdentifier());
@@ -1002,7 +1018,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
         final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
         final long size = flowFile.getSize();
-        final long lastQueuedTime = flowFile.getLastQueueDate();
+        final Long lastQueuedTime = flowFile.getLastQueueDate();
         final long lineageStart = flowFile.getLineageStartDate();
         final boolean penalized = flowFile.isPenalized();
 
@@ -1029,7 +1045,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
             @Override
             public long getLastQueuedTime() {
-                return lastQueuedTime;
+                return lastQueuedTime == null ? 0L : lastQueuedTime;
             }
 
             @Override
@@ -1100,6 +1116,18 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         return null;
     }
 
+
+    @Override
+    public void verifyCanList() throws IllegalStateException {
+        if (connection.getSource().isRunning()) {
+            throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running");
+        }
+
+        if (connection.getDestination().isRunning()) {
+            throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running");
+        }
+    }
+
     @Override
     public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
         logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 09ac7f2..f58d4b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -39,6 +39,8 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
@@ -55,17 +57,27 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.sun.istack.logging.Logger;
+
+import ch.qos.logback.classic.BasicConfigurator;
+
 public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
     private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
 
+    @BeforeClass
+    public static void setupLogging() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
+    }
+
     @Before
     @SuppressWarnings("unchecked")
     public void setup() {
@@ -388,6 +400,112 @@ public class TestStandardFlowFileQueue {
         assertEquals(20, swapManager.swapInCalledCount);
     }
 
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesOnlyActiveQueue() throws InterruptedException {
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000);
+        assertNotNull(status);
+        assertEquals(9999, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(9999, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(2, status.getTotalStepCount());
+        assertEquals(2, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException {
+        for (int i = 0; i < 11000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000);
+        assertNotNull(status);
+        assertEquals(11000, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(11000, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(2, status.getTotalStepCount());
+        assertEquals(2, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException {
+        for (int i = 0; i < 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000);
+        assertNotNull(status);
+        assertEquals(20000, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(20000, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(3, status.getTotalStepCount());
+        assertEquals(3, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(30050, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000000)
+    public void testListFlowFilesResultsLimited() throws InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(100, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+    }
+
     private class TestSwapManager implements FlowFileSwapManager {
         private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
         int swapOutCalledCount = 0;


[2/2] nifi git commit: Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108

Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6d64f58d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6d64f58d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6d64f58d

Branch: refs/heads/NIFI-108
Commit: 6d64f58d403d5a21f22e3dc29fb9c4fa922a6c4a
Parents: e18038a ab30bf0
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 17:32:07 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 17:32:07 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/web/api/ConnectionResource.java | 42 +++++++++++++++-----
 .../org/apache/nifi/web/api/dto/DtoFactory.java | 18 +++++++++
 .../web/dao/impl/StandardConnectionDAO.java     | 19 +++++----
 3 files changed, 62 insertions(+), 17 deletions(-)
----------------------------------------------------------------------