You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/04/09 16:59:30 UTC

[nifi] branch master updated: NIFI-7087: Use FlowManager.findAllConnections() when available

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84968e7  NIFI-7087: Use FlowManager.findAllConnections() when available
84968e7 is described below

commit 84968e70d293e49888addc3870c6a9cf222103b0
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 12 16:27:56 2020 -0500

    NIFI-7087: Use FlowManager.findAllConnections() when available
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4026
---
 .../java/org/apache/nifi/controller/FlowController.java     |  8 ++++----
 .../status/analytics/ConnectionStatusAnalytics.java         | 13 ++++++++-----
 .../apache/nifi/provenance/ComponentIdentifierLookup.java   | 11 ++++++-----
 .../status/analytics/TestConnectionStatusAnalytics.java     |  7 +++----
 4 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 72c3416..33df9be 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -875,7 +875,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         writeLock.lock();
         try {
             // get all connections/queues and recover from swap files.
-            final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+            final Set<Connection> connections = flowManager.findAllConnections();
 
             flowFileRepository.loadFlowFiles(queueProvider);
 
@@ -1086,7 +1086,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                 startRemoteGroupPortsAfterInitialization.clear();
             }
 
-            for (final Connection connection : flowManager.getRootGroup().findAllConnections()) {
+            for (final Connection connection : flowManager.findAllConnections()) {
                 connection.getFlowFileQueue().startLoadBalancing();
             }
         } finally {
@@ -2645,7 +2645,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
         }
 
-        final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+        final Set<Connection> connections = flowManager.findAllConnections();
         FlowFileQueue queue = null;
         for (final Connection connection : connections) {
             if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
@@ -2696,7 +2696,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
         }
 
-        final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+        final Set<Connection> connections = flowManager.findAllConnections();
         FlowFileQueue queue = null;
         for (final Connection connection : connections) {
             if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index a524566..5500fbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Optional;
 import java.util.stream.Stream;
 
 import org.apache.commons.collections4.MapUtils;
@@ -33,7 +32,6 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.StatusHistory;
-import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
@@ -336,9 +334,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
     }
 
     private Connection getConnection() {
-        final ProcessGroup rootGroup = flowManager.getRootGroup();
-        Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
-        return connection.orElse(null);
+        Connection connection = null;
+        for (Connection c : flowManager.findAllConnections()) {
+            if (c.getIdentifier().equals(this.connectionIdentifier)) {
+                connection = c;
+                break;
+            }
+        }
+        return connection;
     }
 
     private FlowFileEvent getStatusReport() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
index 8dc1a9d..f5da0fc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
@@ -24,7 +24,6 @@ import org.apache.nifi.processor.Processor;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class ComponentIdentifierLookup implements IdentifierLookup {
     private final FlowController flowController;
@@ -62,10 +61,12 @@ public class ComponentIdentifierLookup implements IdentifierLookup {
 
     @Override
     public List<String> getQueueIdentifiers() {
-        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+        Set<Connection> connectionSet = flowController.getFlowManager().findAllConnections();
+        List<String> identifiers = new ArrayList<>(connectionSet.size());
 
-        return rootGroup.findAllConnections().stream()
-            .map(Connection::getIdentifier)
-            .collect(Collectors.toList());
+        for (Connection c : connectionSet) {
+            identifiers.add(c.getIdentifier());
+        }
+        return identifiers;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
index ffa86a8..f406d12 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -26,12 +26,11 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -88,7 +87,7 @@ public class TestConnectionStatusAnalytics {
         final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
 
 
-        final List<Connection> connections = new ArrayList<>();
+        final Set<Connection> connections = new HashSet<>();
         final String connectionIdentifier = "1";
         connections.add(connection);
 
@@ -96,7 +95,7 @@ public class TestConnectionStatusAnalytics {
         when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L);
         when(connection.getIdentifier()).thenReturn(connectionIdentifier);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
-        when(processGroup.findAllConnections()).thenReturn(connections);
+        when(flowManager.findAllConnections()).thenReturn(connections);
         when(flowManager.getRootGroup()).thenReturn(processGroup);
         when(flowFileEvent.getContentSizeIn()).thenReturn(10L);
         when(flowFileEvent.getContentSizeOut()).thenReturn(10L);