You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:51 UTC

[35/47] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 0000000,8cff5dd..856ccc1
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@@ -1,0 -1,2019 +1,2020 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.groups;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.annotation.lifecycle.OnShutdown;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.ConnectableType;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.LocalPort;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.connectable.Position;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.Snippet;
+ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.label.Label;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.logging.LogRepositoryFactory;
+ import org.apache.nifi.nar.NarCloseable;
 -import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
 -import org.apache.nifi.processor.annotation.OnRemoved;
 -import org.apache.nifi.processor.annotation.OnShutdown;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.builder.HashCodeBuilder;
+ import org.apache.commons.lang3.builder.ToStringBuilder;
+ import org.apache.commons.lang3.builder.ToStringStyle;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public final class StandardProcessGroup implements ProcessGroup {
+ 
+     private final String id;
+     private final AtomicReference<ProcessGroup> parent;
+     private final AtomicReference<String> name;
+     private final AtomicReference<Position> position;
+     private final AtomicReference<String> comments;
+ 
+     private final ProcessScheduler scheduler;
+     private final ControllerServiceProvider controllerServiceProvider;
+ 
+     private final Map<String, Port> inputPorts = new HashMap<>();
+     private final Map<String, Port> outputPorts = new HashMap<>();
+     private final Map<String, Connection> connections = new HashMap<>();
+     private final Map<String, ProcessGroup> processGroups = new HashMap<>();
+     private final Map<String, Label> labels = new HashMap<>();
+     private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
+     private final Map<String, ProcessorNode> processors = new HashMap<>();
+     private final Map<String, Funnel> funnels = new HashMap<>();
+     private final StringEncryptor encryptor;
+ 
+     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+     private final Lock readLock = rwLock.readLock();
+     private final Lock writeLock = rwLock.writeLock();
+ 
+     private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
+ 
+     public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor) {
+         this.id = id;
+         this.controllerServiceProvider = serviceProvider;
+         this.parent = new AtomicReference<>();
+         this.scheduler = scheduler;
+         this.comments = new AtomicReference<>("");
+         this.encryptor = encryptor;
+         name = new AtomicReference<>();
+         position = new AtomicReference<>(new Position(0D, 0D));
+     }
+ 
+     @Override
+     public ProcessGroup getParent() {
+         return parent.get();
+     }
+ 
+     @Override
+     public void setParent(final ProcessGroup newParent) {
+         parent.set(newParent);
+     }
+ 
+     @Override
+     public String getIdentifier() {
+         return id;
+     }
+ 
+     @Override
+     public String getName() {
+         return name.get();
+     }
+ 
+     @Override
+     public void setName(final String name) {
+         if (StringUtils.isBlank(name)) {
+             throw new IllegalArgumentException("The name cannot be blank.");
+         }
+ 
+         this.name.set(name);
+     }
+ 
+     @Override
+     public void setPosition(final Position position) {
+         this.position.set(position);
+     }
+ 
+     @Override
+     public Position getPosition() {
+         return position.get();
+     }
+ 
+     @Override
+     public String getComments() {
+         return this.comments.get();
+     }
+ 
+     @Override
+     public void setComments(final String comments) {
+         this.comments.set(comments);
+     }
+ 
+     @Override
+     public ProcessGroupCounts getCounts() {
+         int inputPortCount = 0;
+         int outputPortCount = 0;
+ 
+         int running = 0;
+         int stopped = 0;
+         int invalid = 0;
+         int disabled = 0;
+         int activeRemotePorts = 0;
+         int inactiveRemotePorts = 0;
+ 
+         readLock.lock();
+         try {
+             for (final ProcessorNode procNode : processors.values()) {
+                 if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
+                     disabled++;
+                 } else if (procNode.isRunning()) {
+                     running++;
+                 } else if (!procNode.isValid()) {
+                     invalid++;
+                 } else {
+                     stopped++;
+                 }
+             }
+ 
+             inputPortCount = inputPorts.size();
+             for (final Port port : inputPorts.values()) {
+                 if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+                     disabled++;
+                 } else if (port.isRunning()) {
+                     running++;
+                 } else if (!port.isValid()) {
+                     invalid++;
+                 } else {
+                     stopped++;
+                 }
+             }
+ 
+             outputPortCount = outputPorts.size();
+             for (final Port port : outputPorts.values()) {
+                 if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+                     disabled++;
+                 } else if (port.isRunning()) {
+                     running++;
+                 } else if (!port.isValid()) {
+                     invalid++;
+                 } else {
+                     stopped++;
+                 }
+             }
+ 
+             for (final ProcessGroup childGroup : processGroups.values()) {
+                 final ProcessGroupCounts childCounts = childGroup.getCounts();
+                 running += childCounts.getRunningCount();
+                 stopped += childCounts.getStoppedCount();
+                 invalid += childCounts.getInvalidCount();
+                 disabled += childCounts.getDisabledCount();
+             }
+ 
+             for (final RemoteProcessGroup remoteGroup : findAllRemoteProcessGroups()) {
+                 // Count only input ports that have incoming connections
+                 for (final Port port : remoteGroup.getInputPorts()) {
+                     if (port.hasIncomingConnection()) {
+                         if (port.isRunning()) {
+                             activeRemotePorts++;
+                         } else {
+                             inactiveRemotePorts++;
+                         }
+                     }
+                 }
+ 
+                 // Count only output ports that have outgoing connections
+                 for (final Port port : remoteGroup.getOutputPorts()) {
+                     if (!port.getConnections().isEmpty()) {
+                         if (port.isRunning()) {
+                             activeRemotePorts++;
+                         } else {
+                             inactiveRemotePorts++;
+                         }
+                     }
+                 }
+ 
+                 final String authIssue = remoteGroup.getAuthorizationIssue();
+                 if (authIssue != null) {
+                     invalid++;
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+ 
+         return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped,
+                 invalid, disabled, activeRemotePorts, inactiveRemotePorts);
+     }
+ 
+     @Override
+     public boolean isRootGroup() {
+         return parent.get() == null;
+     }
+ 
+     @Override
+     public void startProcessing() {
+         readLock.lock();
+         try {
+             for (final ProcessorNode node : processors.values()) {
+                 try {
+                     if (!node.isRunning() && node.getScheduledState() != ScheduledState.DISABLED) {
+                         startProcessor(node);
+                     }
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to start {} due to {}", new Object[]{node, t});
+                 }
+             }
+ 
+             for (final Port inputPort : getInputPorts()) {
+                 if (inputPort.getScheduledState() != ScheduledState.DISABLED) {
+                     startInputPort(inputPort);
+                 }
+             }
+ 
+             for (final Port outputPort : getOutputPorts()) {
+                 if (outputPort.getScheduledState() != ScheduledState.DISABLED) {
+                     startOutputPort(outputPort);
+                 }
+             }
+ 
+             for (final Funnel funnel : getFunnels()) {
+                 if (funnel.getScheduledState() != ScheduledState.DISABLED) {
+                     startFunnel(funnel);
+                 }
+             }
+ 
+             // Recursively start child groups.
+             for (final ProcessGroup group : processGroups.values()) {
+                 group.startProcessing();
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void stopProcessing() {
+         readLock.lock();
+         try {
+             for (final ProcessorNode node : processors.values()) {
+                 try {
+                     if (node.isRunning()) {
+                         stopProcessor(node);
+                     }
+                 } catch (final Throwable t) {
+                     LOG.error("Unable to stop {} due to {}", new Object[]{node, t});
+                 }
+             }
+ 
+             for (final Port inputPort : getInputPorts()) {
+                 if (inputPort.getScheduledState() == ScheduledState.RUNNING) {
+                     stopInputPort(inputPort);
+                 }
+             }
+ 
+             for (final Port outputPort : getOutputPorts()) {
+                 if (outputPort.getScheduledState() == ScheduledState.RUNNING) {
+                     stopOutputPort(outputPort);
+                 }
+             }
+ 
+             // Recursively stop child groups.
+             for (final ProcessGroup group : processGroups.values()) {
+                 group.stopProcessing();
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
++    @SuppressWarnings("deprecation")
+     private void shutdown(final ProcessGroup procGroup) {
+         for (final ProcessorNode node : procGroup.getProcessors()) {
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor());
++                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor());
+             }
+         }
+ 
+         for ( final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups() ) {
+             rpg.shutdown();
+         }
+         
+         // Recursively shutdown child groups.
+         for (final ProcessGroup group : procGroup.getProcessGroups()) {
+             shutdown(group);
+         }
+     }
+ 
+     @Override
+     public void shutdown() {
+         readLock.lock();
+         try {
+             shutdown(this);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addInputPort(final Port port) {
+         if (isRootGroup()) {
+             if (!(port instanceof RootGroupPort)) {
+                 throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group");
+             }
+         } else if (!(port instanceof LocalPort)) {
+             throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to a non-root group");
+         }
+ 
+         writeLock.lock();
+         try {
+             if (inputPorts.containsKey(requireNonNull(port).getIdentifier())) {
+                 throw new IllegalStateException("Input Port with ID " + port.getIdentifier() + " already exists");
+             }
+ 
+             if (getInputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("Input Port with name " + port.getName() + " already exists");
+             }
+ 
+             port.setProcessGroup(this);
+             inputPorts.put(requireNonNull(port).getIdentifier(), port);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeInputPort(final Port port) {
+         writeLock.lock();
+         try {
+             final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier());
+             if (toRemove == null) {
+                 throw new IllegalStateException(port + " is not an Input Port of this Process Group");
+             }
+ 
+             port.verifyCanDelete();
+             for (final Connection conn : port.getConnections()) {
+                 conn.verifyCanDelete();
+             }
+ 
+             if (port.isRunning()) {
+                 stopInputPort(port);
+             }
+ 
+             // must copy to avoid a concurrent modification
+             final Set<Connection> copy = new HashSet<>(port.getConnections());
+             for (final Connection conn : copy) {
+                 removeConnection(conn);
+             }
+ 
+             final Port removed = inputPorts.remove(port.getIdentifier());
+             if (removed == null) {
+                 throw new IllegalStateException(port + " is not an Input Port of this Process Group");
+             }
+ 
+             LOG.info("Input Port {} removed from flow", port);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Port getInputPort(final String id) {
+         readLock.lock();
+         try {
+             return inputPorts.get(Objects.requireNonNull(id));
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<Port> getInputPorts() {
+         readLock.lock();
+         try {
+             return new HashSet<>(inputPorts.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addOutputPort(final Port port) {
+         if (isRootGroup()) {
+             if (!(port instanceof RootGroupPort)) {
+                 throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to the Root Group");
+             }
+         } else if (!(port instanceof LocalPort)) {
+             throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to a non-root group");
+         }
+ 
+         writeLock.lock();
+         try {
+             if (outputPorts.containsKey(requireNonNull(port).getIdentifier())) {
+                 throw new IllegalStateException("Output Port with ID " + port.getIdentifier() + " already exists");
+             }
+ 
+             if (getOutputPortByName(port.getName()) != null) {
+                 throw new IllegalStateException("Output Port with Name " + port.getName() + " already exists");
+             }
+ 
+             port.setProcessGroup(this);
+             outputPorts.put(port.getIdentifier(), port);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeOutputPort(final Port port) {
+         writeLock.lock();
+         try {
+             final Port toRemove = outputPorts.get(requireNonNull(port).getIdentifier());
+             toRemove.verifyCanDelete();
+ 
+             if (port.isRunning()) {
+                 stopOutputPort(port);
+             }
+ 
+             if (!toRemove.getConnections().isEmpty()) {
+                 throw new IllegalStateException(port + " cannot be removed until its connections are removed");
+             }
+ 
+             final Port removed = outputPorts.remove(port.getIdentifier());
+             if (removed == null) {
+                 throw new IllegalStateException(port + " is not an Output Port of this Process Group");
+             }
+ 
+             LOG.info("Output Port {} removed from flow", port);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Port getOutputPort(final String id) {
+         readLock.lock();
+         try {
+             return outputPorts.get(Objects.requireNonNull(id));
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<Port> getOutputPorts() {
+         readLock.lock();
+         try {
+             return new HashSet<>(outputPorts.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addProcessGroup(final ProcessGroup group) {
+         if (StringUtils.isEmpty(group.getName())) {
+             throw new IllegalArgumentException("Process Group's name must be specified");
+         }
+ 
+         writeLock.lock();
+         try {
+             group.setParent(this);
+             processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public ProcessGroup getProcessGroup(final String id) {
+         readLock.lock();
+         try {
+             return processGroups.get(id);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<ProcessGroup> getProcessGroups() {
+         readLock.lock();
+         try {
+             return new HashSet<>(processGroups.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeProcessGroup(final ProcessGroup group) {
+         if (!requireNonNull(group).isEmpty()) {
+             throw new IllegalStateException("Cannot remove " + group + " because it is not empty");
+         }
+ 
+         writeLock.lock();
+         try {
+             final ProcessGroup toRemove = processGroups.get(group.getIdentifier());
+             if (toRemove == null) {
+                 throw new IllegalStateException(group + " is not a member of this Process Group");
+             }
+             verifyCanRemove(toRemove);
+ 
+             processGroups.remove(group.getIdentifier());
+ 
+             LOG.info("{} removed from flow", group);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     private void verifyCanRemove(final ProcessGroup childGroup) {
+         if (!childGroup.isEmpty()) {
+             throw new IllegalStateException("Cannot remove ProcessGroup because it is not empty");
+         }
+     }
+ 
+     @Override
+     public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
+         writeLock.lock();
+         try {
+             if (remoteGroups.containsKey(requireNonNull(remoteGroup).getIdentifier())) {
+                 throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
+             }
+ 
+             remoteGroup.setProcessGroup(this);
+             remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<RemoteProcessGroup> getRemoteProcessGroups() {
+         readLock.lock();
+         try {
+             return new HashSet<>(remoteGroups.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) {
+         final String remoteGroupId = requireNonNull(remoteProcessGroup).getIdentifier();
+ 
+         writeLock.lock();
+         try {
+             final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId);
+             if (remoteGroup == null) {
+                 throw new IllegalStateException(remoteProcessGroup + " is not a member of this Process Group");
+             }
+ 
+             remoteGroup.verifyCanDelete();
+             for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
+                 for (final Connection connection : port.getConnections()) {
+                     connection.verifyCanDelete();
+                 }
+             }
+ 
+             for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
+                 // must copy to avoid a concurrent modification
+                 final Set<Connection> copy = new HashSet<>(port.getConnections());
+                 for (final Connection connection : copy) {
+                     removeConnection(connection);
+                 }
+             }
+ 
+             try {
+                 remoteGroup.onRemove();
+             } catch (final Exception e) {
+                 LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e);
+             }
+ 
+             remoteGroups.remove(remoteGroupId);
+             LOG.info("{} removed from flow", remoteProcessGroup);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addProcessor(final ProcessorNode processor) {
+         writeLock.lock();
+         try {
+             final String processorId = requireNonNull(processor).getIdentifier();
+             final ProcessorNode existingProcessor = processors.get(processorId);
+             if (existingProcessor != null) {
+                 throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
+             }
+ 
+             processor.setProcessGroup(this);
+             processors.put(processorId, processor);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
++    @SuppressWarnings("deprecation")
+     @Override
+     public void removeProcessor(final ProcessorNode processor) {
+         final String id = requireNonNull(processor).getIdentifier();
+         writeLock.lock();
+         try {
+             if (!processors.containsKey(id)) {
+                 throw new IllegalStateException(processor + " is not a member of this Process Group");
+             }
+ 
+             processor.verifyCanDelete();
+             for (final Connection conn : processor.getConnections()) {
+                 conn.verifyCanDelete();
+             }
+ 
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                 final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor);
 -                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
++                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext);
+             } catch (final Exception e) {
+                 throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
+             }
+ 
+             processors.remove(id);
+             LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
+ 
+             // must copy to avoid a concurrent modification
+             final Set<Connection> copy = new HashSet<>(processor.getConnections());
+             for (final Connection conn : copy) {
+                 removeConnection(conn);
+             }
+ 
+             LOG.info("{} removed from flow", processor);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<ProcessorNode> getProcessors() {
+         readLock.lock();
+         try {
+             return new LinkedHashSet<>(processors.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public ProcessorNode getProcessor(final String id) {
+         readLock.lock();
+         try {
+             return processors.get(Objects.requireNonNull(id));
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     private boolean isInputPort(final Connectable connectable) {
+         if (connectable.getConnectableType() != ConnectableType.INPUT_PORT) {
+             return false;
+         }
+         return findInputPort(connectable.getIdentifier()) != null;
+     }
+ 
+     private boolean isOutputPort(final Connectable connectable) {
+         if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) {
+             return false;
+         }
+         return findOutputPort(connectable.getIdentifier()) != null;
+     }
+ 
+     @Override
+     public void inheritConnection(final Connection connection) {
+         writeLock.lock();
+         try {
+             connections.put(connection.getIdentifier(), connection);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void addConnection(final Connection connection) {
+         writeLock.lock();
+         try {
+             final String id = requireNonNull(connection).getIdentifier();
+             final Connection existingConnection = connections.get(id);
+             if (existingConnection != null) {
+                 throw new IllegalStateException("Connection already exists with ID " + id);
+             }
+ 
+             final Connectable source = connection.getSource();
+             final Connectable destination = connection.getDestination();
+             final ProcessGroup sourceGroup = source.getProcessGroup();
+             final ProcessGroup destinationGroup = destination.getProcessGroup();
+ 
+             // validate the connection is validate wrt to the source & destination groups
+             if (isInputPort(source)) { // if source is an input port, its destination must be in the same group unless it's an input port
+                 if (isInputPort(destination)) { // if destination is input port, it must be in a child group.
+                     if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+                         throw new IllegalStateException("Cannot add Connection to Process Group because destination is an Input Port that does not belong to a child Process Group");
+                     }
+                 } else if (sourceGroup != this || destinationGroup != this) {
+                     throw new IllegalStateException("Cannot add Connection to Process Group because source and destination are not both in this Process Group");
+                 }
+             } else if (isOutputPort(source)) { // if source is an output port, its group must be a child of this group, and its destination must be in this group (processor/output port) or a child group (input port)
+                 if (!processGroups.containsKey(sourceGroup.getIdentifier())) {
+                     throw new IllegalStateException("Cannot add Connection to Process Group because source is an Output Port that does not belong to a child Process Group");
+                 }
+ 
+                 if (isInputPort(destination)) {
+                     if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+                         throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group");
+                     }
+                 } else {
+                     if (destinationGroup != this) {
+                         throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group");
+                     }
+                 }
+             } else { // source is not a port
+                 if (sourceGroup != this) {
+                     throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group");
+                 }
+ 
+                 if (isOutputPort(destination)) {
+                     if (destinationGroup != this) {
+                         throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Output Port but does not belong to this Process Group");
+                     }
+                 } else if (isInputPort(destination)) {
+                     if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+                         throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port but the Input Port does not belong to a child Process Group");
+                     }
+                 } else if (destinationGroup != this) {
+                     throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination + " because they are in different Process Groups and neither is an Input Port or Output Port");
+                 }
+             }
+ 
+             connection.setProcessGroup(this);
+             source.addConnection(connection);
+             if (source != destination) {  // don't call addConnection twice if it's a self-looping connection.
+                 destination.addConnection(connection);
+             }
+             connections.put(connection.getIdentifier(), connection);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Connectable getConnectable(final String id) {
+         readLock.lock();
+         try {
+             final ProcessorNode node = processors.get(id);
+             if (node != null) {
+                 return node;
+             }
+ 
+             final Port inputPort = inputPorts.get(id);
+             if (inputPort != null) {
+                 return inputPort;
+             }
+ 
+             final Port outputPort = outputPorts.get(id);
+             if (outputPort != null) {
+                 return outputPort;
+             }
+ 
+             final Funnel funnel = funnels.get(id);
+             if (funnel != null) {
+                 return funnel;
+             }
+ 
+             return null;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeConnection(final Connection connectionToRemove) {
+         writeLock.lock();
+         try {
+             // verify that Connection belongs to this group
+             final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier());
+             if (connection == null) {
+                 throw new IllegalStateException(connectionToRemove + " is not a member of this Process Group");
+             }
+ 
+             connectionToRemove.verifyCanDelete();
+ 
+             final Connectable source = connectionToRemove.getSource();
+             final Connectable dest = connectionToRemove.getDestination();
+ 
+             // update the source & destination
+             source.removeConnection(connection);
+             if (source != dest) {
+                 dest.removeConnection(connection);
+             }
+ 
+             // remove the connection from our map
+             connections.remove(connection.getIdentifier());
+             LOG.info("{} removed from flow", connection);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<Connection> getConnections() {
+         readLock.lock();
+         try {
+             return new HashSet<>(connections.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Connection getConnection(final String id) {
+         readLock.lock();
+         try {
+             return connections.get(Objects.requireNonNull(id));
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public List<Connection> findAllConnections() {
+         return findAllConnections(this);
+     }
+ 
+     private List<Connection> findAllConnections(final ProcessGroup group) {
+         final List<Connection> connections = new ArrayList<>(group.getConnections());
+         for (final ProcessGroup childGroup : group.getProcessGroups()) {
+             connections.addAll(findAllConnections(childGroup));
+         }
+         return connections;
+     }
+ 
+     @Override
+     public void addLabel(final Label label) {
+         writeLock.lock();
+         try {
+             final Label existing = labels.get(requireNonNull(label).getIdentifier());
+             if (existing != null) {
+                 throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
+             }
+ 
+             label.setProcessGroup(this);
+             labels.put(label.getIdentifier(), label);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeLabel(final Label label) {
+         writeLock.lock();
+         try {
+             final Label removed = labels.remove(requireNonNull(label).getIdentifier());
+             if (removed == null) {
+                 throw new IllegalStateException(label + " is not a member of this Process Group.");
+             }
+ 
+             LOG.info("Label with ID {} removed from flow", label.getIdentifier());
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<Label> getLabels() {
+         readLock.lock();
+         try {
+             return new HashSet<>(labels.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Label getLabel(final String id) {
+         readLock.lock();
+         try {
+             return labels.get(id);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean isEmpty() {
+         readLock.lock();
+         try {
+             return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty()
+                     && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public RemoteProcessGroup getRemoteProcessGroup(final String id) {
+         readLock.lock();
+         try {
+             return remoteGroups.get(Objects.requireNonNull(id));
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void startProcessor(final ProcessorNode processor) {
+         readLock.lock();
+         try {
+             if (getProcessor(processor.getIdentifier()) == null) {
+                 throw new IllegalStateException("Processor is not a member of this Process Group");
+             }
+ 
+             final ScheduledState state = processor.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("Processor is disabled");
+             } else if (state == ScheduledState.RUNNING) {
+                 return;
+             }
+ 
+             scheduler.startProcessor(processor);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void startInputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (getInputPort(port.getIdentifier()) == null) {
+                 throw new IllegalStateException(port + " is not a member of this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("InputPort " + port + " is disabled");
+             } else if (state == ScheduledState.RUNNING) {
+                 return;
+             }
+ 
+             scheduler.startPort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void startOutputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (getOutputPort(port.getIdentifier()) == null) {
+                 throw new IllegalStateException("Port is not a member of this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("OutputPort is disabled");
+             } else if (state == ScheduledState.RUNNING) {
+                 return;
+             }
+ 
+             scheduler.startPort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void startFunnel(final Funnel funnel) {
+         readLock.lock();
+         try {
+             if (getFunnel(funnel.getIdentifier()) == null) {
+                 throw new IllegalStateException("Funnel is not a member of this Process Group");
+             }
+ 
+             final ScheduledState state = funnel.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("Funnel is disabled");
+             } else if (state == ScheduledState.RUNNING) {
+                 return;
+             }
+             scheduler.startFunnel(funnel);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void stopProcessor(final ProcessorNode processor) {
+         readLock.lock();
+         try {
+             if (!processors.containsKey(processor.getIdentifier())) {
+                 throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = processor.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("Processor is disabled");
+             } else if (state == ScheduledState.STOPPED) {
+                 return;
+             }
+ 
+             scheduler.stopProcessor(processor);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void stopInputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!inputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("InputPort is disabled");
+             } else if (state == ScheduledState.STOPPED) {
+                 return;
+             }
+ 
+             scheduler.stopPort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void stopOutputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!outputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("OutputPort is disabled");
+             } else if (state == ScheduledState.STOPPED) {
+                 return;
+             }
+ 
+             scheduler.stopPort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void stopFunnel(final Funnel funnel) {
+         readLock.lock();
+         try {
+             if (!funnels.containsKey(funnel.getIdentifier())) {
+                 throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = funnel.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 throw new IllegalStateException("Funnel is disabled");
+             } else if (state == ScheduledState.STOPPED) {
+                 return;
+             }
+ 
+             scheduler.stopFunnel(funnel);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void enableFunnel(final Funnel funnel) {
+         readLock.lock();
+         try {
+             if (!funnels.containsKey(funnel.getIdentifier())) {
+                 throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = funnel.getScheduledState();
+             if (state == ScheduledState.STOPPED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("Funnel is currently running");
+             }
+ 
+             scheduler.enableFunnel(funnel);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void enableInputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!inputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.STOPPED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("InputPort is currently running");
+             }
+ 
+             scheduler.enablePort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void enableOutputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!outputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.STOPPED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("OutputPort is currently running");
+             }
+ 
+             scheduler.enablePort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void enableProcessor(final ProcessorNode processor) {
+         readLock.lock();
+         try {
+             if (!processors.containsKey(processor.getIdentifier())) {
+                 throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = processor.getScheduledState();
+             if (state == ScheduledState.STOPPED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("Processor is currently running");
+             }
+ 
+             scheduler.enableProcessor(processor);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void disableFunnel(final Funnel funnel) {
+         readLock.lock();
+         try {
+             if (!funnels.containsKey(funnel.getIdentifier())) {
+                 throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = funnel.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("Funnel is currently running");
+             }
+ 
+             scheduler.disableFunnel(funnel);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void disableInputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!inputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No InputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("InputPort is currently running");
+             }
+ 
+             scheduler.disablePort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void disableOutputPort(final Port port) {
+         readLock.lock();
+         try {
+             if (!outputPorts.containsKey(port.getIdentifier())) {
+                 throw new IllegalStateException("No OutputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = port.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("OutputPort is currently running");
+             }
+ 
+             scheduler.disablePort(port);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void disableProcessor(final ProcessorNode processor) {
+         readLock.lock();
+         try {
+             if (!processors.containsKey(processor.getIdentifier())) {
+                 throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+             }
+ 
+             final ScheduledState state = processor.getScheduledState();
+             if (state == ScheduledState.DISABLED) {
+                 return;
+             } else if (state == ScheduledState.RUNNING) {
+                 throw new IllegalStateException("Processor is currently running");
+             }
+ 
+             scheduler.disableProcessor(processor);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean equals(final Object obj) {
+         if (obj instanceof StandardProcessGroup) {
+             final StandardProcessGroup other = (StandardProcessGroup) obj;
+             return (getIdentifier().equals(other.getIdentifier()));
+         } else {
+             return false;
+         }
+     }
+ 
+     @Override
+     public int hashCode() {
+         return new HashCodeBuilder().append(getIdentifier()).toHashCode();
+     }
+ 
+     @Override
+     public String toString() {
+         return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", name).toString();
+     }
+ 
+     @Override
+     public ProcessGroup findProcessGroup(final String id) {
+         return findProcessGroup(requireNonNull(id), this);
+     }
+ 
+     private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) {
+         if (id.equals(start.getIdentifier())) {
+             return start;
+         }
+ 
+         for (final ProcessGroup group : start.getProcessGroups()) {
+             final ProcessGroup matching = findProcessGroup(id, group);
+             if (matching != null) {
+                 return matching;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     @Override
+     public List<RemoteProcessGroup> findAllRemoteProcessGroups() {
+         return findAllRemoteProcessGroups(this);
+     }
+ 
+     private List<RemoteProcessGroup> findAllRemoteProcessGroups(final ProcessGroup start) {
+         final List<RemoteProcessGroup> remoteGroups = new ArrayList<>(start.getRemoteProcessGroups());
+         for (final ProcessGroup childGroup : start.getProcessGroups()) {
+             remoteGroups.addAll(findAllRemoteProcessGroups(childGroup));
+         }
+         return remoteGroups;
+     }
+ 
+     @Override
+     public RemoteProcessGroup findRemoteProcessGroup(final String id) {
+         return findRemoteProcessGroup(requireNonNull(id), this);
+     }
+ 
+     private RemoteProcessGroup findRemoteProcessGroup(final String id, final ProcessGroup start) {
+         RemoteProcessGroup remoteGroup = start.getRemoteProcessGroup(id);
+         if (remoteGroup != null) {
+             return remoteGroup;
+         }
+ 
+         for (final ProcessGroup group : start.getProcessGroups()) {
+             remoteGroup = findRemoteProcessGroup(id, group);
+             if (remoteGroup != null) {
+                 return remoteGroup;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     @Override
+     public ProcessorNode findProcessor(final String id) {
+         return findProcessor(id, this);
+     }
+ 
+     private ProcessorNode findProcessor(final String id, final ProcessGroup start) {
+         ProcessorNode node = start.getProcessor(id);
+         if (node != null) {
+             return node;
+         }
+ 
+         for (final ProcessGroup group : start.getProcessGroups()) {
+             node = findProcessor(id, group);
+             if (node != null) {
+                 return node;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     @Override
+     public List<ProcessorNode> findAllProcessors() {
+         return findAllProcessors(this);
+     }
+ 
+     private List<ProcessorNode> findAllProcessors(final ProcessGroup start) {
+         final List<ProcessorNode> allNodes = new ArrayList<>(start.getProcessors());
+         for (final ProcessGroup group : start.getProcessGroups()) {
+             allNodes.addAll(findAllProcessors(group));
+         }
+         return allNodes;
+     }
+ 
+     public Connectable findConnectable(final String identifier) {
+         return findConnectable(identifier, this);
+     }
+ 
+     private static Connectable findConnectable(final String identifier, final ProcessGroup group) {
+         final ProcessorNode procNode = group.getProcessor(identifier);
+         if (procNode != null) {
+             return procNode;
+         }
+ 
+         final Port inPort = group.getInputPort(identifier);
+         if (inPort != null) {
+             return inPort;
+         }
+ 
+         final Port outPort = group.getOutputPort(identifier);
+         if (outPort != null) {
+             return outPort;
+         }
+ 
+         final Funnel funnel = group.getFunnel(identifier);
+         if (funnel != null) {
+             return funnel;
+         }
+ 
+         for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+             final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
+             if (remoteInPort != null) {
+                 return remoteInPort;
+             }
+ 
+             final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
+             if (remoteOutPort != null) {
+                 return remoteOutPort;
+             }
+         }
+ 
+         for (final ProcessGroup childGroup : group.getProcessGroups()) {
+             final Connectable childGroupConnectable = findConnectable(identifier, childGroup);
+             if (childGroupConnectable != null) {
+                 return childGroupConnectable;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     @Override
+     public List<Label> findAllLabels() {
+         return findAllLabels(this);
+     }
+ 
+     private List<Label> findAllLabels(final ProcessGroup start) {
+         final List<Label> allLabels = new ArrayList<>(start.getLabels());
+         for (final ProcessGroup group : start.getProcessGroups()) {
+             allLabels.addAll(findAllLabels(group));
+         }
+         return allLabels;
+     }
+ 
+     @Override
+     public Port findInputPort(final String id) {
+         return findPort(id, this, new InputPortRetriever());
+     }
+ 
+     @Override
+     public Port findOutputPort(final String id) {
+         return findPort(id, this, new OutputPortRetriever());
+     }
+ 
+     @Override
+     public Port getInputPortByName(final String name) {
+         return getPortByName(name, this, new InputPortRetriever());
+     }
+ 
+     @Override
+     public Port getOutputPortByName(final String name) {
+         return getPortByName(name, this, new OutputPortRetriever());
+     }
+ 
+     private interface PortRetriever {
+ 
+         Port getPort(ProcessGroup group, String id);
+ 
+         Set<Port> getPorts(ProcessGroup group);
+     }
+ 
+     private static class InputPortRetriever implements PortRetriever {
+ 
+         @Override
+         public Set<Port> getPorts(final ProcessGroup group) {
+             return group.getInputPorts();
+         }
+ 
+         @Override
+         public Port getPort(final ProcessGroup group, final String id) {
+             return group.getInputPort(id);
+         }
+     }
+ 
+     private static class OutputPortRetriever implements PortRetriever {
+ 
+         @Override
+         public Set<Port> getPorts(final ProcessGroup group) {
+             return group.getOutputPorts();
+         }
+ 
+         @Override
+         public Port getPort(final ProcessGroup group, final String id) {
+             return group.getOutputPort(id);
+         }
+     }
+ 
+     private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) {
+         Port port = retriever.getPort(group, id);
+         if (port != null) {
+             return port;
+         }
+ 
+         for (final ProcessGroup childGroup : group.getProcessGroups()) {
+             port = findPort(id, childGroup, retriever);
+             if (port != null) {
+                 return port;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
+         for (final Port port : retriever.getPorts(group)) {
+             if (port.getName().equals(name)) {
+                 return port;
+             }
+         }
+ 
+         return null;
+     }
+ 
+     @Override
+     public void addFunnel(final Funnel funnel) {
+         writeLock.lock();
+         try {
+             final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
+             if (existing != null) {
+                 throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
+             }
+ 
+             funnel.setProcessGroup(this);
+             funnels.put(funnel.getIdentifier(), funnel);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Funnel getFunnel(final String id) {
+         readLock.lock();
+         try {
+             return funnels.get(id);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void removeFunnel(final Funnel funnel) {
+         writeLock.lock();
+         try {
+             final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
+             if (existing == null) {
+                 throw new IllegalStateException(funnel + " is not a member of this ProcessGroup");
+             }
+ 
+             funnel.verifyCanDelete();
+             for (final Connection conn : funnel.getConnections()) {
+                 conn.verifyCanDelete();
+             }
+ 
+             stopFunnel(funnel);
+ 
+             // must copy to avoid a concurrent modification
+             final Set<Connection> copy = new HashSet<>(funnel.getConnections());
+             for (final Connection conn : copy) {
+                 removeConnection(conn);
+             }
+ 
+             funnels.remove(funnel.getIdentifier());
+             LOG.info("{} removed from flow", funnel);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public Set<Funnel> getFunnels() {
+         readLock.lock();
+         try {
+             return new HashSet<>(funnels.values());
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void remove(final Snippet snippet) {
+         writeLock.lock();
+         try {
+             // ensure that all components are valid
+             verifyContents(snippet);
+ 
+             final Set<Connectable> connectables = getAllConnectables(snippet);
+             final Set<String> connectionIdsToRemove = new HashSet<>(replaceNullWithEmptySet(snippet.getConnections()));
+             // Remove all connections that are the output of any Connectable.
+             for (final Connectable connectable : connectables) {
+                 for (final Connection conn : connectable.getConnections()) {
+                     if (!connections.containsKey(conn.getIdentifier())) {
+                         throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections from the parent Process Group");
+                     }
+                     connectionIdsToRemove.add(conn.getIdentifier());
+                 }
+             }
+ 
+             // verify that all connections can be removed
+             for (final String id : connectionIdsToRemove) {
+                 connections.get(id).verifyCanDelete();
+             }
+ 
+             // verify that all processors are stopped and have no active threads
+             for (final String procId : snippet.getProcessors()) {
+                 final ProcessorNode procNode = getProcessor(procId);
+                 if (procNode.isRunning()) {
+                     throw new IllegalStateException(procNode + " cannot be removed because it is running");
+                 }
+                 final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
+                 if (activeThreadCount != 0) {
+                     throw new IllegalStateException(procNode + " cannot be removed because it still has " + activeThreadCount + " active threads");
+                 }
+             }
+ 
+             // verify that none of the connectables have incoming connections that are not in the Snippet.
+             final Set<String> connectionIds = snippet.getConnections();
+             for (final Connectable connectable : connectables) {
+                 for (final Connection conn : connectable.getIncomingConnections()) {
+                     if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
+                         throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections that are not selected to be deleted");
+                     }
+                 }
+             }
+ 
+             // verify that all of the ProcessGroups in the snippet are empty 
+             for (final String groupId : snippet.getProcessGroups()) {
+                 final ProcessGroup toRemove = getProcessGroup(groupId);
+                 if (!toRemove.isEmpty()) {
+                     throw new IllegalStateException("Process Group with name " + toRemove.getName() + " cannot be removed because it is not empty");
+                 }
+             }
+ 
+             for (final String id : connectionIdsToRemove) {
+                 removeConnection(connections.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+                 removeInputPort(inputPorts.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+                 removeOutputPort(outputPorts.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+                 removeFunnel(funnels.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getLabels())) {
+                 removeLabel(labels.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
+                 removeProcessGroup(processGroups.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+                 removeProcessor(processors.get(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+                 removeRemoteProcessGroup(remoteGroups.get(id));
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     private Set<String> replaceNullWithEmptySet(final Set<String> toReplace) {
+         return (toReplace == null) ? new HashSet<String>() : toReplace;
+     }
+ 
+     @Override
+     public void move(final Snippet snippet, final ProcessGroup destination) {
+         writeLock.lock();
+         try {
+             verifyContents(snippet);
+ 
+             if (!isDisconnected(snippet)) {
+                 throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+             }
+ 
+             if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
+                 throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group");
+             }
+ 
+             for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+                 destination.addInputPort(inputPorts.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+                 destination.addOutputPort(outputPorts.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+                 destination.addFunnel(funnels.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getLabels())) {
+                 destination.addLabel(labels.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
+                 destination.addProcessGroup(processGroups.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+                 destination.addProcessor(processors.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+                 destination.addRemoteProcessGroup(remoteGroups.remove(id));
+             }
+             for (final String id : replaceNullWithEmptySet(snippet.getConnections())) {
+                 destination.inheritConnection(connections.remove(id));
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     private Set<Connectable> getAllConnectables(final Snippet snippet) {
+         final Set<Connectable> connectables = new HashSet<>();
+         for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+             connectables.add(getInputPort(id));
+         }
+         for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+             connectables.add(getOutputPort(id));
+         }
+         for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+             connectables.add(getFunnel(id));
+         }
+         for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+             connectables.add(getProcessor(id));
+         }
+         return connectables;
+     }
+ 
+     private boolean isDisconnected(final Snippet snippet) {
+         final Set<Connectable> connectables = getAllConnectables(snippet);
+ 
+         for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+             final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id);
+             connectables.addAll(remoteGroup.getInputPorts());
+             connectables.addAll(remoteGroup.getOutputPorts());
+         }
+ 
+         final Set<String> connectionIds = snippet.getConnections();
+         for (final Connectable connectable : connectables) {
+             for (final Connection conn : connectable.getIncomingConnections()) {
+                 if (!connectionIds.contains(conn.getIdentifier())) {
+                     return false;
+                 }
+             }
+ 
+             for (final Connection conn : connectable.getConnections()) {
+                 if (!connectionIds.contains(conn.getIdentifier())) {
+                     return false;
+                 }
+             }
+         }
+ 
+         final Set<Connectable> recursiveConnectables = new HashSet<>(connectables);
+         for (final String id : snippet.getProcessGroups()) {
+             final ProcessGroup childGroup = getProcessGroup(id);
+             recursiveConnectables.addAll(findAllConnectables(childGroup, true));
+         }
+ 
+         for (final String id : connectionIds) {
+             final Connection connection = getConnection(id);
+             if (!recursiveConnectables.contains(connection.getSource()) || !recursiveConnectables.contains(connection.getDestination())) {
+                 return false;
+             }
+         }
+ 
+         return true;
+     }
+ 
+     private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) {
+         final Set<Connectable> set = new HashSet<>();
+         set.addAll(group.getInputPorts());
+         set.addAll(group.getOutputPorts());
+         set.addAll(group.getFunnels());
+         set.addAll(group.getProcessors());
+         if (includeRemotePorts) {
+             for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+                 set.addAll(remoteGroup.getInputPorts());
+                 set.addAll(remoteGroup.getOutputPorts());
+             }
+         }
+ 
+         for (final ProcessGroup childGroup : group.getProcessGroups()) {
+             set.addAll(findAllConnectables(childGroup, includeRemotePorts));
+         }
+ 
+         return set;
+     }
+ 
+     /**
+      * Verifies that all ID's defined within the given snippet reference
+      * components within this ProcessGroup. If this is not the case, throws
+      * {@link IllegalStateException}.
+      *
+      * @param snippet
+      * @throws NullPointerException if the argument is null
+      * @throws IllegalStateException if the snippet contains an ID that
+      * references a component that is not part of this ProcessGroup
+      */
+     private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
+         requireNonNull(snippet);
+ 
+         verifyAllKeysExist(snippet.getInputPorts(), inputPorts, "Input Port");
+         verifyAllKeysExist(snippet.getOutputPorts(), outputPorts, "Output Port");
+         verifyAllKeysExist(snippet.getFunnels(), funnels, "Funnel");
+         verifyAllKeysExist(snippet.getLabels(), labels, "Label");
+         verifyAllKeysExist(snippet.getProcessGroups(), processGroups, "Process Group");
+         verifyAllKeysExist(snippet.getProcessors(), processors, "Processor");
+         verifyAllKeysExist(snippet.getRemoteProcessGroups(), remoteGroups, "Remote Process Group");
+         verifyAllKeysExist(snippet.getConnections(), connections, "Connection");
+     }
+ 
+     /**
+      * <p>
+      * Verifies that all ID's specified by the given set exist as keys in the
+      * given Map. If any of the ID's does not exist as a key in the map, will
+      * throw {@link IllegalStateException} indicating the ID that is invalid and
+      * specifying the Component Type.
+      * </p>
+      *
+      * <p>
+      * If the ids given are null, will do no validation.
+      * </p>
+      *
+      * @param ids
+      * @param map
+      * @param componentType
+      */
+     private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) {
+         if (ids != null) {
+             for (final String id : ids) {
+                 if (!map.containsKey(id)) {
+                     throw new IllegalStateException("ID " + id + " does not refer to a(n) " + componentType + " in this ProcessGroup");
+                 }
+             }
+         }
+     }
+ 
+     @Override
+     public void verifyCanDelete() {
+         if (!isEmpty()) {
+             throw new IllegalStateException(this + " is not empty");
+         }
+     }
+ 
+     @Override
+     public void verifyCanStop() {
+     }
+ 
+     @Override
+     public void verifyCanStart() {
+         readLock.lock();
+         try {
+             for (final Connectable connectable : findAllConnectables(this, false)) {
+                 if (connectable.getScheduledState() == ScheduledState.STOPPED) {
+                     if (scheduler.getActiveThreadCount(connectable) > 0) {
+                         throw new IllegalStateException("Cannot start " + connectable + " because it is currently stopping");
+                     }
+ 
+                     connectable.verifyCanStart();
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanDelete(final Snippet snippet) throws IllegalStateException {
+         readLock.lock();
+         try {
+             if (!id.equals(snippet.getParentGroupId())) {
+                 throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
+             }
+ 
+             if (!isDisconnected(snippet)) {
+                 throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+             }
+ 
+             for (final String id : snippet.getConnections()) {
+                 final Connection connection = getConnection(id);
+                 if (connection == null) {
+                     throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+ 
+                 connection.verifyCanDelete();
+             }
+ 
+             for (final String id : snippet.getFunnels()) {
+                 final Funnel funnel = getFunnel(id);
+                 if (funnel == null) {
+                     throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+ 
+                 funnel.verifyCanDelete(true);
+             }
+ 
+             for (final String id : snippet.getInputPorts()) {
+                 final Port port = getInputPort(id);
+                 if (port == null) {
+                     throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+ 
+                 port.verifyCanDelete(true);
+             }
+ 
+             for (final String id : snippet.getLabels()) {
+                 final Label label = getLabel(id);
+                 if (label == null) {
+                     throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+             }
+ 
+             for (final String id : snippet.getOutputPorts()) {
+                 final Port port = getOutputPort(id);
+                 if (port == null) {
+                     throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+                 port.verifyCanDelete(true);
+             }
+ 
+             for (final String id : snippet.getProcessGroups()) {
+                 final ProcessGroup group = getProcessGroup(id);
+                 if (group == null) {
+                     throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+                 group.verifyCanDelete();
+             }
+ 
+             for (final String id : snippet.getProcessors()) {
+                 final ProcessorNode processor = getProcessor(id);
+                 if (processor == null) {
+                     throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+                 processor.verifyCanDelete(true);
+             }
+ 
+             for (final String id : snippet.getRemoteProcessGroups()) {
+                 final RemoteProcessGroup group = getRemoteProcessGroup(id);
+                 if (group == null) {
+                     throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup");
+                 }
+                 group.verifyCanDelete(true);
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) throws IllegalStateException {
+         readLock.lock();
+         try {
+             if (!id.equals(snippet.getParentGroupId())) {
+                 throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
+             }
+ 
+             verifyContents(snippet);
+ 
+             if (!isDisconnected(snippet)) {
+                 throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+             }
+ 
+             if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
+                 throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group");
+             }
+ 
+             for (final String id : snippet.getInputPorts()) {
+                 final Port port = getInputPort(id);
+                 final String portName = port.getName();
+ 
+                 if (newProcessGroup.getInputPortByName(portName) != null) {
+                     throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Input Port with the name " + portName);
+                 }
+             }
+ 
+             for (final String id : snippet.getOutputPorts()) {
+                 final Port port = getOutputPort(id);
+                 final String portName = port.getName();
+ 
+                 if (newProcessGroup.getOutputPortByName(portName) != null) {
+                     throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Output Port with the name " + portName);
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 0000000,318901f..ac58504
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@@ -1,0 -1,113 +1,113 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processor;
+ 
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ 
+ public class StandardSchedulingContext implements SchedulingContext {
+ 
+     private final ProcessContext processContext;
+     private final ControllerServiceProvider serviceProvider;
+     private final ProcessorNode processorNode;
+ 
+     public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) {
+         this.processContext = processContext;
+         this.serviceProvider = serviceProvider;
+         this.processorNode = processorNode;
+     }
+ 
+     @Override
+     public void leaseControllerService(final String identifier) {
+         final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier);
+         if (serviceNode == null) {
+             throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
+         }
+ 
+         if (serviceNode.isDisabled()) {
 -            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
++            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
+         }
+ 
+         if (!serviceNode.isValid()) {
 -            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
++            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
+         }
+ 
+         serviceNode.addReference(processorNode);
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+         return processContext.getProperty(descriptor);
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final String propertyName) {
+         return processContext.getProperty(propertyName);
+     }
+ 
+     @Override
+     public PropertyValue newPropertyValue(final String rawValue) {
+         return processContext.newPropertyValue(rawValue);
+     }
+ 
+     @Override
+     public void yield() {
+         processContext.yield();
+     }
+ 
+     @Override
+     public int getMaxConcurrentTasks() {
+         return processContext.getMaxConcurrentTasks();
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return processContext.getAnnotationData();
+     }
+ 
+     @Override
+     public Map<PropertyDescriptor, String> getProperties() {
+         return processContext.getProperties();
+     }
+ 
+     @Override
+     public String encrypt(final String unencrypted) {
+         return processContext.encrypt(unencrypted);
+     }
+ 
+     @Override
+     public String decrypt(final String encrypted) {
+         return processContext.decrypt(encrypted);
+     }
+ 
+     @Override
+     public ControllerServiceLookup getControllerServiceLookup() {
+         return processContext.getControllerServiceLookup();
+     }
+ 
+     @Override
+     public Set<Relationship> getAvailableRelationships() {
+         return processContext.getAvailableRelationships();
+     }
+ }