You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/04/09 16:59:30 UTC
[nifi] branch master updated: NIFI-7087: Use
FlowManager.findAllConnections() when available
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 84968e7 NIFI-7087: Use FlowManager.findAllConnections() when available
84968e7 is described below
commit 84968e70d293e49888addc3870c6a9cf222103b0
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 12 16:27:56 2020 -0500
NIFI-7087: Use FlowManager.findAllConnections() when available
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4026
---
.../java/org/apache/nifi/controller/FlowController.java | 8 ++++----
.../status/analytics/ConnectionStatusAnalytics.java | 13 ++++++++-----
.../apache/nifi/provenance/ComponentIdentifierLookup.java | 11 ++++++-----
.../status/analytics/TestConnectionStatusAnalytics.java | 7 +++----
4 files changed, 21 insertions(+), 18 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 72c3416..33df9be 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -875,7 +875,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
writeLock.lock();
try {
// get all connections/queues and recover from swap files.
- final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+ final Set<Connection> connections = flowManager.findAllConnections();
flowFileRepository.loadFlowFiles(queueProvider);
@@ -1086,7 +1086,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
startRemoteGroupPortsAfterInitialization.clear();
}
- for (final Connection connection : flowManager.getRootGroup().findAllConnections()) {
+ for (final Connection connection : flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}
} finally {
@@ -2645,7 +2645,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
}
- final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+ final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
@@ -2696,7 +2696,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
}
- final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
+ final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index a524566..5500fbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
@@ -33,7 +32,6 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.StatusHistory;
-import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
@@ -336,9 +334,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
}
private Connection getConnection() {
- final ProcessGroup rootGroup = flowManager.getRootGroup();
- Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
- return connection.orElse(null);
+ Connection connection = null;
+ for (Connection c : flowManager.findAllConnections()) {
+ if (c.getIdentifier().equals(this.connectionIdentifier)) {
+ connection = c;
+ break;
+ }
+ }
+ return connection;
}
private FlowFileEvent getStatusReport() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
index 8dc1a9d..f5da0fc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java
@@ -24,7 +24,6 @@ import org.apache.nifi.processor.Processor;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
public class ComponentIdentifierLookup implements IdentifierLookup {
private final FlowController flowController;
@@ -62,10 +61,12 @@ public class ComponentIdentifierLookup implements IdentifierLookup {
@Override
public List<String> getQueueIdentifiers() {
- final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ Set<Connection> connectionSet = flowController.getFlowManager().findAllConnections();
+ List<String> identifiers = new ArrayList<>(connectionSet.size());
- return rootGroup.findAllConnections().stream()
- .map(Connection::getIdentifier)
- .collect(Collectors.toList());
+ for (Connection c : connectionSet) {
+ identifiers.add(c.getIdentifier());
+ }
+ return identifiers;
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
index ffa86a8..f406d12 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -26,12 +26,11 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -88,7 +87,7 @@ public class TestConnectionStatusAnalytics {
final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
- final List<Connection> connections = new ArrayList<>();
+ final Set<Connection> connections = new HashSet<>();
final String connectionIdentifier = "1";
connections.add(connection);
@@ -96,7 +95,7 @@ public class TestConnectionStatusAnalytics {
when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L);
when(connection.getIdentifier()).thenReturn(connectionIdentifier);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
- when(processGroup.findAllConnections()).thenReturn(connections);
+ when(flowManager.findAllConnections()).thenReturn(connections);
when(flowManager.getRootGroup()).thenReturn(processGroup);
when(flowFileEvent.getContentSizeIn()).thenReturn(10L);
when(flowFileEvent.getContentSizeOut()).thenReturn(10L);