You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2018/08/01 23:35:11 UTC
nifi git commit: NIFI-5480: Use FlowController's maps of components
in order to look up component by ID rather than iterating recursively through
all Process Groups to find the component
Repository: nifi
Updated Branches:
refs/heads/master b4894c557 -> 4cca9bef7
NIFI-5480: Use FlowController's maps of components in order to look up component by ID rather than iterating recursively through all Process Groups to find the component
This closes #2932
Signed-off-by: Jeremy Dyer <je...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4cca9bef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4cca9bef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4cca9bef
Branch: refs/heads/master
Commit: 4cca9bef7c9bcad912cb7df6f3976715d09a6daf
Parents: b4894c5
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Aug 1 17:49:51 2018 -0400
Committer: Jeremy Dyer <je...@apache.org>
Committed: Wed Aug 1 19:33:56 2018 -0400
----------------------------------------------------------------------
.../org/apache/nifi/groups/ProcessGroup.java | 8 --
.../apache/nifi/controller/FlowController.java | 33 ++++-
.../reporting/StandardReportingContext.java | 4 +-
.../nifi/groups/StandardProcessGroup.java | 134 ++++++-------------
.../service/mock/MockProcessGroup.java | 5 -
.../StandardAuthorizableLookup.java | 4 +-
.../nifi/web/controller/ControllerFacade.java | 6 +-
.../web/dao/impl/StandardProcessGroupDAO.java | 58 ++++++--
8 files changed, 130 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index e9c4d87..01451df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -771,14 +771,6 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
void remove(final Snippet snippet);
/**
- * @param identifier of connectable
- * @return the Connectable with the given ID, if it exists; otherwise
- * returns null. This performs a recursive search of all ProcessGroups'
- * input ports, output ports, funnels, processors
- */
- Connectable findLocalConnectable(String identifier);
-
- /**
* @param identifier of remote group port
* @return the RemoteGroupPort with the given ID, if it exists; otherwise
* returns null.
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index e7623e5..b83749c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2697,6 +2697,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
allProcessors.remove(identifier);
}
+ public Connectable findLocalConnectable(final String id) {
+ final ProcessorNode procNode = getProcessorNode(id);
+ if (procNode != null) {
+ return procNode;
+ }
+
+ final Port inPort = getInputPort(id);
+ if (inPort != null) {
+ return inPort;
+ }
+
+ final Port outPort = getOutputPort(id);
+ if (outPort != null) {
+ return outPort;
+ }
+
+ final Funnel funnel = getFunnel(id);
+ if (funnel != null) {
+ return funnel;
+ }
+
+ final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id);
+ if (remoteGroupPort != null) {
+ return remoteGroupPort;
+ }
+
+ return null;
+ }
+
public ProcessorNode getProcessorNode(final String id) {
return allProcessors.get(id);
}
@@ -4933,7 +4962,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
authorizable = new DataAuthorizable(getRootGroup());
} else {
// check if the component is a connectable, this should be the case most often
- final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
+ final Connectable connectable = findLocalConnectable(componentId);
if (connectable == null) {
// if the component id is not a connectable then consider a connection
final Connection connection = getRootGroup().findConnection(componentId);
@@ -4980,7 +5009,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
authorizable = new ProvenanceDataAuthorizable(getRootGroup());
} else {
// check if the component is a connectable, this should be the case most often
- final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
+ final Connectable connectable = findLocalConnectable(componentId);
if (connectable == null) {
// if the component id is not a connectable then consider a connection
final Connection connection = getRootGroup().findConnection(componentId);
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 440fd79..d95a220 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -29,7 +29,6 @@ import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
@@ -95,8 +94,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
@Override
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
- final Connectable connectable = rootGroup.findLocalConnectable(componentId);
+ final Connectable connectable = flowController.findLocalConnectable(componentId);
if (connectable == null) {
throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index e77be2a..edfa355 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -16,31 +16,6 @@
*/
package org.apache.nifi.groups;
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -145,6 +120,31 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
public final class StandardProcessGroup implements ProcessGroup {
private final String id;
@@ -1696,80 +1696,34 @@ public final class StandardProcessGroup implements ProcessGroup {
return allNodes;
}
- @Override
- public Connectable findLocalConnectable(final String identifier) {
- return findLocalConnectable(identifier, this);
- }
-
- private static Connectable findLocalConnectable(final String identifier, final ProcessGroup group) {
- final ProcessorNode procNode = group.getProcessor(identifier);
- if (procNode != null) {
- return procNode;
- }
-
- final Port inPort = group.getInputPort(identifier);
- if (inPort != null) {
- return inPort;
- }
-
- final Port outPort = group.getOutputPort(identifier);
- if (outPort != null) {
- return outPort;
- }
-
- final Funnel funnel = group.getFunnel(identifier);
- if (funnel != null) {
- return funnel;
- }
-
- for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) {
- final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier);
- if (remoteInputPort != null) {
- return remoteInputPort;
- }
-
- final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier);
- if (remoteOutputPort != null) {
- return remoteOutputPort;
- }
- }
-
- for (final ProcessGroup childGroup : group.getProcessGroups()) {
- final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup);
- if (childGroupConnectable != null) {
- return childGroupConnectable;
- }
- }
-
- return null;
- }
@Override
public RemoteGroupPort findRemoteGroupPort(final String identifier) {
- return findRemoteGroupPort(identifier, this);
- }
+ readLock.lock();
+ try {
+ for (final RemoteProcessGroup remoteGroup : remoteGroups.values()) {
+ final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
+ if (remoteInPort != null) {
+ return remoteInPort;
+ }
- private static RemoteGroupPort findRemoteGroupPort(final String identifier, final ProcessGroup group) {
- for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
- final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
- if (remoteInPort != null) {
- return remoteInPort;
+ final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
+ if (remoteOutPort != null) {
+ return remoteOutPort;
+ }
}
- final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
- if (remoteOutPort != null) {
- return remoteOutPort;
+ for (final ProcessGroup childGroup : processGroups.values()) {
+ final RemoteGroupPort childGroupRemoteGroupPort = childGroup.findRemoteGroupPort(identifier);
+ if (childGroupRemoteGroupPort != null) {
+ return childGroupRemoteGroupPort;
+ }
}
- }
- for (final ProcessGroup childGroup : group.getProcessGroups()) {
- final RemoteGroupPort childGroupRemoteGroupPort = findRemoteGroupPort(identifier, childGroup);
- if (childGroupRemoteGroupPort != null) {
- return childGroupRemoteGroupPort;
- }
+ return null;
+ } finally {
+ readLock.unlock();
}
-
- return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 374d02b..ec2caef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -528,11 +528,6 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public Connectable findLocalConnectable(final String identifier) {
- return null;
- }
-
- @Override
public RemoteGroupPort findRemoteGroupPort(String identifier) {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
index bbdd9e0..88d8c46 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
@@ -732,9 +732,7 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
@Override
public Authorizable getLocalConnectable(String id) {
- final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
- final Connectable connectable = group.findLocalConnectable(id);
-
+ final Connectable connectable = controllerFacade.findLocalConnectable(id);
if (connectable == null) {
throw new ResourceNotFoundException("Unable to find component with id " + id);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 1cbc5e2..1d08081 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -166,6 +166,10 @@ public class ControllerFacade implements Authorizable {
}
}
+ public Connectable findLocalConnectable(String componentId) {
+ return flowController.findLocalConnectable(componentId);
+ }
+
public ControllerServiceProvider getControllerServiceProvider() {
return flowController;
}
@@ -1527,7 +1531,7 @@ public class ControllerFacade implements Authorizable {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
- final Connectable connectable = root.findLocalConnectable(dto.getComponentId());
+ final Connectable connectable = findLocalConnectable(dto.getComponentId());
if (connectable != null) {
dto.setGroupId(connectable.getProcessGroup().getIdentifier());
http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index cce3390..06aab80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -43,6 +43,7 @@ import org.apache.nifi.web.dao.ProcessGroupDAO;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -104,13 +105,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
+ final Set<ProcessGroup> validGroups = new HashSet<>();
+ validGroups.add(group);
+ validGroups.addAll(group.findAllProcessGroups());
+
for (final String componentId : componentIds) {
- final Connectable connectable = group.findLocalConnectable(componentId);
+ final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (connectable == null) {
- final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
- if (remotePort == null) {
- throw new ResourceNotFoundException("Unable to find component with id " + componentId);
- }
+ throw new ResourceNotFoundException("Unable to find component with id " + componentId);
+ }
+
+ if (connectable instanceof RemoteGroupPort) {
+ final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
if (ScheduledState.RUNNING.equals(state)) {
remotePort.verifyCanStart();
@@ -134,8 +140,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
public void verifyEnableComponents(String groupId, ScheduledState state, Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
+ final Set<ProcessGroup> validGroups = new HashSet<>();
+ validGroups.add(group);
+ validGroups.addAll(group.findAllProcessGroups());
+
for (final String componentId : componentIds) {
- final Connectable connectable = group.findLocalConnectable(componentId);
+ final Connectable connectable = findConnectable(componentId, groupId, validGroups);
if (ScheduledState.STOPPED.equals(state)) {
connectable.verifyCanEnable();
} else if (ScheduledState.DISABLED.equals(state)) {
@@ -159,14 +169,37 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
}
+ private Connectable findConnectable(final String componentId, final String groupId, final Set<ProcessGroup> validProcessGroups) {
+ // Get the component with the given ID and ensure that it belongs to the group that we are looking for.
+ // We do this, rather than calling ProcessGroup.findLocalConnectable because for any component that is buried several
+ // layers of Process Groups deep, that method becomes quite a bit more expensive than this method, due to all of the
+ // Read Locks that must be obtained while recursing through the Process Group's descendant groups.
+ final Connectable connectable = flowController.findLocalConnectable(componentId);
+ if (connectable == null) {
+ throw new ResourceNotFoundException("Could not find Component with ID " + componentId);
+ }
+
+ final ProcessGroup connectableGroup = connectable.getProcessGroup();
+ if (!validProcessGroups.contains(connectableGroup)) {
+ throw new ResourceNotFoundException("Component with ID " + componentId + " does not belong to Process Group " + groupId + " or any of its descendent groups");
+ }
+
+ return connectable;
+ }
+
@Override
public Future<Void> scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+ final Set<ProcessGroup> validGroups = new HashSet<>();
+ validGroups.add(group);
+ validGroups.addAll(group.findAllProcessGroups());
+
for (final String componentId : componentIds) {
- final Connectable connectable = group.findLocalConnectable(componentId);
+ final Connectable connectable = findConnectable(componentId, groupId, validGroups);
+
if (ScheduledState.RUNNING.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
@@ -181,7 +214,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
break;
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
- final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+ final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
break;
}
@@ -199,7 +232,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
break;
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
- final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+ final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
remotePort.getRemoteProcessGroup().stopTransmitting(remotePort);
break;
}
@@ -213,8 +246,13 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
+ final Set<ProcessGroup> validGroups = new HashSet<>();
+ validGroups.add(group);
+ validGroups.addAll(group.findAllProcessGroups());
+
for (final String componentId : componentIds) {
- final Connectable connectable = group.findLocalConnectable(componentId);
+ final Connectable connectable = findConnectable(componentId, groupId, validGroups);
+
if (ScheduledState.STOPPED.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR: