You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2018/08/01 23:35:11 UTC

nifi git commit: NIFI-5480: Use FlowController's maps of components in order to look up component by ID rather than iterating recursively through all Process Groups to find the component

Repository: nifi
Updated Branches:
  refs/heads/master b4894c557 -> 4cca9bef7


NIFI-5480: Use FlowController's maps of components in order to look up component by ID rather than iterating recursively through all Process Groups to find the component

This closes #2932

Signed-off-by: Jeremy Dyer <je...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4cca9bef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4cca9bef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4cca9bef

Branch: refs/heads/master
Commit: 4cca9bef7c9bcad912cb7df6f3976715d09a6daf
Parents: b4894c5
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Aug 1 17:49:51 2018 -0400
Committer: Jeremy Dyer <je...@apache.org>
Committed: Wed Aug 1 19:33:56 2018 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/groups/ProcessGroup.java    |   8 --
 .../apache/nifi/controller/FlowController.java  |  33 ++++-
 .../reporting/StandardReportingContext.java     |   4 +-
 .../nifi/groups/StandardProcessGroup.java       | 134 ++++++-------------
 .../service/mock/MockProcessGroup.java          |   5 -
 .../StandardAuthorizableLookup.java             |   4 +-
 .../nifi/web/controller/ControllerFacade.java   |   6 +-
 .../web/dao/impl/StandardProcessGroupDAO.java   |  58 ++++++--
 8 files changed, 130 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index e9c4d87..01451df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -771,14 +771,6 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
     void remove(final Snippet snippet);
 
     /**
-     * @param identifier of connectable
-     * @return the Connectable with the given ID, if it exists; otherwise
-     * returns null. This performs a recursive search of all ProcessGroups'
-     * input ports, output ports, funnels, processors
-     */
-    Connectable findLocalConnectable(String identifier);
-
-    /**
      * @param identifier of remote group port
      * @return the RemoteGroupPort with the given ID, if it exists; otherwise
      * returns null.

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index e7623e5..b83749c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2697,6 +2697,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         allProcessors.remove(identifier);
     }
 
+    public Connectable findLocalConnectable(final String id) {
+        final ProcessorNode procNode = getProcessorNode(id);
+        if (procNode != null) {
+            return procNode;
+        }
+
+        final Port inPort = getInputPort(id);
+        if (inPort != null) {
+            return inPort;
+        }
+
+        final Port outPort = getOutputPort(id);
+        if (outPort != null) {
+            return outPort;
+        }
+
+        final Funnel funnel = getFunnel(id);
+        if (funnel != null) {
+            return funnel;
+        }
+
+        final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(id);
+        if (remoteGroupPort != null) {
+            return remoteGroupPort;
+        }
+
+        return null;
+    }
+
     public ProcessorNode getProcessorNode(final String id) {
         return allProcessors.get(id);
     }
@@ -4933,7 +4962,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             authorizable = new DataAuthorizable(getRootGroup());
         } else {
             // check if the component is a connectable, this should be the case most often
-            final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
+            final Connectable connectable = findLocalConnectable(componentId);
             if (connectable == null) {
                 // if the component id is not a connectable then consider a connection
                 final Connection connection = getRootGroup().findConnection(componentId);
@@ -4980,7 +5009,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             authorizable = new ProvenanceDataAuthorizable(getRootGroup());
         } else {
             // check if the component is a connectable, this should be the case most often
-            final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
+            final Connectable connectable = findLocalConnectable(componentId);
             if (connectable == null) {
                 // if the component id is not a connectable then consider a connection
                 final Connection connection = getRootGroup().findConnection(componentId);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 440fd79..d95a220 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -29,7 +29,6 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -95,8 +94,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
 
     @Override
     public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
-        final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
-        final Connectable connectable = rootGroup.findLocalConnectable(componentId);
+        final Connectable connectable = flowController.findLocalConnectable(componentId);
         if (connectable == null) {
             throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index e77be2a..edfa355 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -16,31 +16,6 @@
  */
 package org.apache.nifi.groups;
 
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -145,6 +120,31 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
 public final class StandardProcessGroup implements ProcessGroup {
 
     private final String id;
@@ -1696,80 +1696,34 @@ public final class StandardProcessGroup implements ProcessGroup {
         return allNodes;
     }
 
-    @Override
-    public Connectable findLocalConnectable(final String identifier) {
-        return findLocalConnectable(identifier, this);
-    }
-
-    private static Connectable findLocalConnectable(final String identifier, final ProcessGroup group) {
-        final ProcessorNode procNode = group.getProcessor(identifier);
-        if (procNode != null) {
-            return procNode;
-        }
-
-        final Port inPort = group.getInputPort(identifier);
-        if (inPort != null) {
-            return inPort;
-        }
-
-        final Port outPort = group.getOutputPort(identifier);
-        if (outPort != null) {
-            return outPort;
-        }
-
-        final Funnel funnel = group.getFunnel(identifier);
-        if (funnel != null) {
-            return funnel;
-        }
-
-        for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) {
-            final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier);
-            if (remoteInputPort != null) {
-                return remoteInputPort;
-            }
-
-            final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier);
-            if (remoteOutputPort != null) {
-                return remoteOutputPort;
-            }
-        }
-
-        for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup);
-            if (childGroupConnectable != null) {
-                return childGroupConnectable;
-            }
-        }
-
-        return null;
-    }
 
     @Override
     public RemoteGroupPort findRemoteGroupPort(final String identifier) {
-        return findRemoteGroupPort(identifier, this);
-    }
+        readLock.lock();
+        try {
+            for (final RemoteProcessGroup remoteGroup : remoteGroups.values()) {
+                final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
+                if (remoteInPort != null) {
+                    return remoteInPort;
+                }
 
-    private static RemoteGroupPort findRemoteGroupPort(final String identifier, final ProcessGroup group) {
-        for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
-            final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
-            if (remoteInPort != null) {
-                return remoteInPort;
+                final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
+                if (remoteOutPort != null) {
+                    return remoteOutPort;
+                }
             }
 
-            final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
-            if (remoteOutPort != null) {
-                return remoteOutPort;
+            for (final ProcessGroup childGroup : processGroups.values()) {
+                final RemoteGroupPort childGroupRemoteGroupPort = childGroup.findRemoteGroupPort(identifier);
+                if (childGroupRemoteGroupPort != null) {
+                    return childGroupRemoteGroupPort;
+                }
             }
-        }
 
-        for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            final RemoteGroupPort childGroupRemoteGroupPort = findRemoteGroupPort(identifier, childGroup);
-            if (childGroupRemoteGroupPort != null) {
-                return childGroupRemoteGroupPort;
-            }
+            return null;
+        } finally {
+            readLock.unlock();
         }
-
-        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 374d02b..ec2caef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -528,11 +528,6 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public Connectable findLocalConnectable(final String identifier) {
-        return null;
-    }
-
-    @Override
     public RemoteGroupPort findRemoteGroupPort(String identifier) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
index bbdd9e0..88d8c46 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java
@@ -732,9 +732,7 @@ class StandardAuthorizableLookup implements AuthorizableLookup {
 
     @Override
     public Authorizable getLocalConnectable(String id) {
-        final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
-        final Connectable connectable = group.findLocalConnectable(id);
-
+        final Connectable connectable = controllerFacade.findLocalConnectable(id);
         if (connectable == null) {
             throw new ResourceNotFoundException("Unable to find component with id " + id);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 1cbc5e2..1d08081 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -166,6 +166,10 @@ public class ControllerFacade implements Authorizable {
         }
     }
 
+    public Connectable findLocalConnectable(String componentId) {
+        return flowController.findLocalConnectable(componentId);
+    }
+
     public ControllerServiceProvider getControllerServiceProvider() {
         return flowController;
     }
@@ -1527,7 +1531,7 @@ public class ControllerFacade implements Authorizable {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
         final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
 
-        final Connectable connectable = root.findLocalConnectable(dto.getComponentId());
+        final Connectable connectable = findLocalConnectable(dto.getComponentId());
         if (connectable != null) {
             dto.setGroupId(connectable.getProcessGroup().getIdentifier());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4cca9bef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index cce3390..06aab80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -43,6 +43,7 @@ import org.apache.nifi.web.dao.ProcessGroupDAO;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -104,13 +105,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
+        final Set<ProcessGroup> validGroups = new HashSet<>();
+        validGroups.add(group);
+        validGroups.addAll(group.findAllProcessGroups());
+
         for (final String componentId : componentIds) {
-            final Connectable connectable = group.findLocalConnectable(componentId);
+            final Connectable connectable = findConnectable(componentId, groupId, validGroups);
             if (connectable == null) {
-                final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
-                if (remotePort == null) {
-                    throw new ResourceNotFoundException("Unable to find component with id " + componentId);
-                }
+                throw new ResourceNotFoundException("Unable to find component with id " + componentId);
+            }
+
+            if (connectable  instanceof RemoteGroupPort) {
+                final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
 
                 if (ScheduledState.RUNNING.equals(state)) {
                     remotePort.verifyCanStart();
@@ -134,8 +140,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     public void verifyEnableComponents(String groupId, ScheduledState state, Set<String> componentIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
+        final Set<ProcessGroup> validGroups = new HashSet<>();
+        validGroups.add(group);
+        validGroups.addAll(group.findAllProcessGroups());
+
         for (final String componentId : componentIds) {
-            final Connectable connectable = group.findLocalConnectable(componentId);
+            final Connectable connectable = findConnectable(componentId, groupId, validGroups);
             if (ScheduledState.STOPPED.equals(state)) {
                 connectable.verifyCanEnable();
             } else if (ScheduledState.DISABLED.equals(state)) {
@@ -159,14 +169,37 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
         }
     }
 
+    private Connectable findConnectable(final String componentId, final String groupId, final Set<ProcessGroup> validProcessGroups) {
+        // Get the component with the given ID and ensure that it belongs to the group that we are looking for.
+        // We do this, rather than calling ProcessGroup.findLocalConnectable because for any component that is buried several
+        // layers of Process Groups deep, that method becomes quite a bit more expensive than this method, due to all of the
+        // Read Locks that must be obtained while recursing through the Process Group's descendant groups.
+        final Connectable connectable = flowController.findLocalConnectable(componentId);
+        if (connectable == null) {
+            throw new ResourceNotFoundException("Could not find Component with ID " + componentId);
+        }
+
+        final ProcessGroup connectableGroup = connectable.getProcessGroup();
+        if (!validProcessGroups.contains(connectableGroup)) {
+            throw new ResourceNotFoundException("Component with ID " + componentId + " does not belong to Process Group " + groupId + " or any of its descendent groups");
+        }
+
+        return connectable;
+    }
+
     @Override
     public Future<Void> scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
         CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
 
+        final Set<ProcessGroup> validGroups = new HashSet<>();
+        validGroups.add(group);
+        validGroups.addAll(group.findAllProcessGroups());
+
         for (final String componentId : componentIds) {
-            final Connectable connectable = group.findLocalConnectable(componentId);
+            final Connectable connectable = findConnectable(componentId, groupId, validGroups);
+
             if (ScheduledState.RUNNING.equals(state)) {
                 switch (connectable.getConnectableType()) {
                     case PROCESSOR:
@@ -181,7 +214,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
                         break;
                     case REMOTE_INPUT_PORT:
                     case REMOTE_OUTPUT_PORT:
-                        final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+                        final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
                         remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
                         break;
                 }
@@ -199,7 +232,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
                         break;
                     case REMOTE_INPUT_PORT:
                     case REMOTE_OUTPUT_PORT:
-                        final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+                        final RemoteGroupPort remotePort = (RemoteGroupPort) connectable;
                         remotePort.getRemoteProcessGroup().stopTransmitting(remotePort);
                         break;
                 }
@@ -213,8 +246,13 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
 
+        final Set<ProcessGroup> validGroups = new HashSet<>();
+        validGroups.add(group);
+        validGroups.addAll(group.findAllProcessGroups());
+
         for (final String componentId : componentIds) {
-            final Connectable connectable = group.findLocalConnectable(componentId);
+            final Connectable connectable = findConnectable(componentId, groupId, validGroups);
+
             if (ScheduledState.STOPPED.equals(state)) {
                 switch (connectable.getConnectableType()) {
                     case PROCESSOR: