You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:59 UTC
[35/48] incubator-nifi git commit: NIFI-6: Rebase from develop to
include renaming of directory structure
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 0000000,8cff5dd..856ccc1
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@@ -1,0 -1,2019 +1,2020 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.groups;
+
+ import static java.util.Objects.requireNonNull;
+
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.annotation.lifecycle.OnShutdown;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.ConnectableType;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.LocalPort;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.connectable.Position;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.Snippet;
+ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.label.Label;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.logging.LogRepositoryFactory;
+ import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.processor.annotation.OnRemoved;
-import org.apache.nifi.processor.annotation.OnShutdown;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.builder.HashCodeBuilder;
+ import org.apache.commons.lang3.builder.ToStringBuilder;
+ import org.apache.commons.lang3.builder.ToStringStyle;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public final class StandardProcessGroup implements ProcessGroup {
+
+ private final String id;
+ private final AtomicReference<ProcessGroup> parent;
+ private final AtomicReference<String> name;
+ private final AtomicReference<Position> position;
+ private final AtomicReference<String> comments;
+
+ private final ProcessScheduler scheduler;
+ private final ControllerServiceProvider controllerServiceProvider;
+
+ private final Map<String, Port> inputPorts = new HashMap<>();
+ private final Map<String, Port> outputPorts = new HashMap<>();
+ private final Map<String, Connection> connections = new HashMap<>();
+ private final Map<String, ProcessGroup> processGroups = new HashMap<>();
+ private final Map<String, Label> labels = new HashMap<>();
+ private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
+ private final Map<String, ProcessorNode> processors = new HashMap<>();
+ private final Map<String, Funnel> funnels = new HashMap<>();
+ private final StringEncryptor encryptor;
+
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
+
+ public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor) {
+ this.id = id;
+ this.controllerServiceProvider = serviceProvider;
+ this.parent = new AtomicReference<>();
+ this.scheduler = scheduler;
+ this.comments = new AtomicReference<>("");
+ this.encryptor = encryptor;
+ name = new AtomicReference<>();
+ position = new AtomicReference<>(new Position(0D, 0D));
+ }
+
+ @Override
+ public ProcessGroup getParent() {
+ return parent.get();
+ }
+
+ @Override
+ public void setParent(final ProcessGroup newParent) {
+ parent.set(newParent);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name.get();
+ }
+
+ @Override
+ public void setName(final String name) {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException("The name cannot be blank.");
+ }
+
+ this.name.set(name);
+ }
+
+ @Override
+ public void setPosition(final Position position) {
+ this.position.set(position);
+ }
+
+ @Override
+ public Position getPosition() {
+ return position.get();
+ }
+
+ @Override
+ public String getComments() {
+ return this.comments.get();
+ }
+
+ @Override
+ public void setComments(final String comments) {
+ this.comments.set(comments);
+ }
+
+ @Override
+ public ProcessGroupCounts getCounts() {
+ int inputPortCount = 0;
+ int outputPortCount = 0;
+
+ int running = 0;
+ int stopped = 0;
+ int invalid = 0;
+ int disabled = 0;
+ int activeRemotePorts = 0;
+ int inactiveRemotePorts = 0;
+
+ readLock.lock();
+ try {
+ for (final ProcessorNode procNode : processors.values()) {
+ if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
+ disabled++;
+ } else if (procNode.isRunning()) {
+ running++;
+ } else if (!procNode.isValid()) {
+ invalid++;
+ } else {
+ stopped++;
+ }
+ }
+
+ inputPortCount = inputPorts.size();
+ for (final Port port : inputPorts.values()) {
+ if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+ disabled++;
+ } else if (port.isRunning()) {
+ running++;
+ } else if (!port.isValid()) {
+ invalid++;
+ } else {
+ stopped++;
+ }
+ }
+
+ outputPortCount = outputPorts.size();
+ for (final Port port : outputPorts.values()) {
+ if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+ disabled++;
+ } else if (port.isRunning()) {
+ running++;
+ } else if (!port.isValid()) {
+ invalid++;
+ } else {
+ stopped++;
+ }
+ }
+
+ for (final ProcessGroup childGroup : processGroups.values()) {
+ final ProcessGroupCounts childCounts = childGroup.getCounts();
+ running += childCounts.getRunningCount();
+ stopped += childCounts.getStoppedCount();
+ invalid += childCounts.getInvalidCount();
+ disabled += childCounts.getDisabledCount();
+ }
+
+ for (final RemoteProcessGroup remoteGroup : findAllRemoteProcessGroups()) {
+ // Count only input ports that have incoming connections
+ for (final Port port : remoteGroup.getInputPorts()) {
+ if (port.hasIncomingConnection()) {
+ if (port.isRunning()) {
+ activeRemotePorts++;
+ } else {
+ inactiveRemotePorts++;
+ }
+ }
+ }
+
+ // Count only output ports that have outgoing connections
+ for (final Port port : remoteGroup.getOutputPorts()) {
+ if (!port.getConnections().isEmpty()) {
+ if (port.isRunning()) {
+ activeRemotePorts++;
+ } else {
+ inactiveRemotePorts++;
+ }
+ }
+ }
+
+ final String authIssue = remoteGroup.getAuthorizationIssue();
+ if (authIssue != null) {
+ invalid++;
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped,
+ invalid, disabled, activeRemotePorts, inactiveRemotePorts);
+ }
+
+ @Override
+ public boolean isRootGroup() {
+ return parent.get() == null;
+ }
+
+ @Override
+ public void startProcessing() {
+ readLock.lock();
+ try {
+ for (final ProcessorNode node : processors.values()) {
+ try {
+ if (!node.isRunning() && node.getScheduledState() != ScheduledState.DISABLED) {
+ startProcessor(node);
+ }
+ } catch (final Throwable t) {
+ LOG.error("Unable to start {} due to {}", new Object[]{node, t});
+ }
+ }
+
+ for (final Port inputPort : getInputPorts()) {
+ if (inputPort.getScheduledState() != ScheduledState.DISABLED) {
+ startInputPort(inputPort);
+ }
+ }
+
+ for (final Port outputPort : getOutputPorts()) {
+ if (outputPort.getScheduledState() != ScheduledState.DISABLED) {
+ startOutputPort(outputPort);
+ }
+ }
+
+ for (final Funnel funnel : getFunnels()) {
+ if (funnel.getScheduledState() != ScheduledState.DISABLED) {
+ startFunnel(funnel);
+ }
+ }
+
+ // Recursively start child groups.
+ for (final ProcessGroup group : processGroups.values()) {
+ group.startProcessing();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void stopProcessing() {
+ readLock.lock();
+ try {
+ for (final ProcessorNode node : processors.values()) {
+ try {
+ if (node.isRunning()) {
+ stopProcessor(node);
+ }
+ } catch (final Throwable t) {
+ LOG.error("Unable to stop {} due to {}", new Object[]{node, t});
+ }
+ }
+
+ for (final Port inputPort : getInputPorts()) {
+ if (inputPort.getScheduledState() == ScheduledState.RUNNING) {
+ stopInputPort(inputPort);
+ }
+ }
+
+ for (final Port outputPort : getOutputPorts()) {
+ if (outputPort.getScheduledState() == ScheduledState.RUNNING) {
+ stopOutputPort(outputPort);
+ }
+ }
+
+ // Recursively stop child groups.
+ for (final ProcessGroup group : processGroups.values()) {
+ group.stopProcessing();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
++ @SuppressWarnings("deprecation")
+ private void shutdown(final ProcessGroup procGroup) {
+ for (final ProcessorNode node : procGroup.getProcessors()) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor());
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor());
+ }
+ }
+
+ for ( final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups() ) {
+ rpg.shutdown();
+ }
+
+ // Recursively shutdown child groups.
+ for (final ProcessGroup group : procGroup.getProcessGroups()) {
+ shutdown(group);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ readLock.lock();
+ try {
+ shutdown(this);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void addInputPort(final Port port) {
+ if (isRootGroup()) {
+ if (!(port instanceof RootGroupPort)) {
+ throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group");
+ }
+ } else if (!(port instanceof LocalPort)) {
+ throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to a non-root group");
+ }
+
+ writeLock.lock();
+ try {
+ if (inputPorts.containsKey(requireNonNull(port).getIdentifier())) {
+ throw new IllegalStateException("Input Port with ID " + port.getIdentifier() + " already exists");
+ }
+
+ if (getInputPortByName(port.getName()) != null) {
+ throw new IllegalStateException("Input Port with name " + port.getName() + " already exists");
+ }
+
+ port.setProcessGroup(this);
+ inputPorts.put(requireNonNull(port).getIdentifier(), port);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeInputPort(final Port port) {
+ writeLock.lock();
+ try {
+ final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier());
+ if (toRemove == null) {
+ throw new IllegalStateException(port + " is not an Input Port of this Process Group");
+ }
+
+ port.verifyCanDelete();
+ for (final Connection conn : port.getConnections()) {
+ conn.verifyCanDelete();
+ }
+
+ if (port.isRunning()) {
+ stopInputPort(port);
+ }
+
+ // must copy to avoid a concurrent modification
+ final Set<Connection> copy = new HashSet<>(port.getConnections());
+ for (final Connection conn : copy) {
+ removeConnection(conn);
+ }
+
+ final Port removed = inputPorts.remove(port.getIdentifier());
+ if (removed == null) {
+ throw new IllegalStateException(port + " is not an Input Port of this Process Group");
+ }
+
+ LOG.info("Input Port {} removed from flow", port);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Port getInputPort(final String id) {
+ readLock.lock();
+ try {
+ return inputPorts.get(Objects.requireNonNull(id));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Port> getInputPorts() {
+ readLock.lock();
+ try {
+ return new HashSet<>(inputPorts.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void addOutputPort(final Port port) {
+ if (isRootGroup()) {
+ if (!(port instanceof RootGroupPort)) {
+ throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to the Root Group");
+ }
+ } else if (!(port instanceof LocalPort)) {
+ throw new IllegalArgumentException("Cannot add Output Port of type " + port.getClass().getName() + " to a non-root group");
+ }
+
+ writeLock.lock();
+ try {
+ if (outputPorts.containsKey(requireNonNull(port).getIdentifier())) {
+ throw new IllegalStateException("Output Port with ID " + port.getIdentifier() + " already exists");
+ }
+
+ if (getOutputPortByName(port.getName()) != null) {
+ throw new IllegalStateException("Output Port with Name " + port.getName() + " already exists");
+ }
+
+ port.setProcessGroup(this);
+ outputPorts.put(port.getIdentifier(), port);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeOutputPort(final Port port) {
+ writeLock.lock();
+ try {
+ final Port toRemove = outputPorts.get(requireNonNull(port).getIdentifier());
+ toRemove.verifyCanDelete();
+
+ if (port.isRunning()) {
+ stopOutputPort(port);
+ }
+
+ if (!toRemove.getConnections().isEmpty()) {
+ throw new IllegalStateException(port + " cannot be removed until its connections are removed");
+ }
+
+ final Port removed = outputPorts.remove(port.getIdentifier());
+ if (removed == null) {
+ throw new IllegalStateException(port + " is not an Output Port of this Process Group");
+ }
+
+ LOG.info("Output Port {} removed from flow", port);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Port getOutputPort(final String id) {
+ readLock.lock();
+ try {
+ return outputPorts.get(Objects.requireNonNull(id));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Port> getOutputPorts() {
+ readLock.lock();
+ try {
+ return new HashSet<>(outputPorts.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void addProcessGroup(final ProcessGroup group) {
+ if (StringUtils.isEmpty(group.getName())) {
+ throw new IllegalArgumentException("Process Group's name must be specified");
+ }
+
+ writeLock.lock();
+ try {
+ group.setParent(this);
+ processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public ProcessGroup getProcessGroup(final String id) {
+ readLock.lock();
+ try {
+ return processGroups.get(id);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<ProcessGroup> getProcessGroups() {
+ readLock.lock();
+ try {
+ return new HashSet<>(processGroups.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeProcessGroup(final ProcessGroup group) {
+ if (!requireNonNull(group).isEmpty()) {
+ throw new IllegalStateException("Cannot remove " + group + " because it is not empty");
+ }
+
+ writeLock.lock();
+ try {
+ final ProcessGroup toRemove = processGroups.get(group.getIdentifier());
+ if (toRemove == null) {
+ throw new IllegalStateException(group + " is not a member of this Process Group");
+ }
+ verifyCanRemove(toRemove);
+
+ processGroups.remove(group.getIdentifier());
+
+ LOG.info("{} removed from flow", group);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void verifyCanRemove(final ProcessGroup childGroup) {
+ if (!childGroup.isEmpty()) {
+ throw new IllegalStateException("Cannot remove ProcessGroup because it is not empty");
+ }
+ }
+
+ @Override
+ public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
+ writeLock.lock();
+ try {
+ if (remoteGroups.containsKey(requireNonNull(remoteGroup).getIdentifier())) {
+ throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
+ }
+
+ remoteGroup.setProcessGroup(this);
+ remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<RemoteProcessGroup> getRemoteProcessGroups() {
+ readLock.lock();
+ try {
+ return new HashSet<>(remoteGroups.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) {
+ final String remoteGroupId = requireNonNull(remoteProcessGroup).getIdentifier();
+
+ writeLock.lock();
+ try {
+ final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId);
+ if (remoteGroup == null) {
+ throw new IllegalStateException(remoteProcessGroup + " is not a member of this Process Group");
+ }
+
+ remoteGroup.verifyCanDelete();
+ for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
+ for (final Connection connection : port.getConnections()) {
+ connection.verifyCanDelete();
+ }
+ }
+
+ for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
+ // must copy to avoid a concurrent modification
+ final Set<Connection> copy = new HashSet<>(port.getConnections());
+ for (final Connection connection : copy) {
+ removeConnection(connection);
+ }
+ }
+
+ try {
+ remoteGroup.onRemove();
+ } catch (final Exception e) {
+ LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e);
+ }
+
+ remoteGroups.remove(remoteGroupId);
+ LOG.info("{} removed from flow", remoteProcessGroup);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addProcessor(final ProcessorNode processor) {
+ writeLock.lock();
+ try {
+ final String processorId = requireNonNull(processor).getIdentifier();
+ final ProcessorNode existingProcessor = processors.get(processorId);
+ if (existingProcessor != null) {
+ throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
+ }
+
+ processor.setProcessGroup(this);
+ processors.put(processorId, processor);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
++ @SuppressWarnings("deprecation")
+ @Override
+ public void removeProcessor(final ProcessorNode processor) {
+ final String id = requireNonNull(processor).getIdentifier();
+ writeLock.lock();
+ try {
+ if (!processors.containsKey(id)) {
+ throw new IllegalStateException(processor + " is not a member of this Process Group");
+ }
+
+ processor.verifyCanDelete();
+ for (final Connection conn : processor.getConnections()) {
+ conn.verifyCanDelete();
+ }
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
+ }
+
+ processors.remove(id);
+ LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
+
+ // must copy to avoid a concurrent modification
+ final Set<Connection> copy = new HashSet<>(processor.getConnections());
+ for (final Connection conn : copy) {
+ removeConnection(conn);
+ }
+
+ LOG.info("{} removed from flow", processor);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<ProcessorNode> getProcessors() {
+ readLock.lock();
+ try {
+ return new LinkedHashSet<>(processors.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ProcessorNode getProcessor(final String id) {
+ readLock.lock();
+ try {
+ return processors.get(Objects.requireNonNull(id));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private boolean isInputPort(final Connectable connectable) {
+ if (connectable.getConnectableType() != ConnectableType.INPUT_PORT) {
+ return false;
+ }
+ return findInputPort(connectable.getIdentifier()) != null;
+ }
+
+ private boolean isOutputPort(final Connectable connectable) {
+ if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) {
+ return false;
+ }
+ return findOutputPort(connectable.getIdentifier()) != null;
+ }
+
+ @Override
+ public void inheritConnection(final Connection connection) {
+ writeLock.lock();
+ try {
+ connections.put(connection.getIdentifier(), connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addConnection(final Connection connection) {
+ writeLock.lock();
+ try {
+ final String id = requireNonNull(connection).getIdentifier();
+ final Connection existingConnection = connections.get(id);
+ if (existingConnection != null) {
+ throw new IllegalStateException("Connection already exists with ID " + id);
+ }
+
+ final Connectable source = connection.getSource();
+ final Connectable destination = connection.getDestination();
+ final ProcessGroup sourceGroup = source.getProcessGroup();
+ final ProcessGroup destinationGroup = destination.getProcessGroup();
+
+ // validate the connection is validate wrt to the source & destination groups
+ if (isInputPort(source)) { // if source is an input port, its destination must be in the same group unless it's an input port
+ if (isInputPort(destination)) { // if destination is input port, it must be in a child group.
+ if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because destination is an Input Port that does not belong to a child Process Group");
+ }
+ } else if (sourceGroup != this || destinationGroup != this) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because source and destination are not both in this Process Group");
+ }
+ } else if (isOutputPort(source)) { // if source is an output port, its group must be a child of this group, and its destination must be in this group (processor/output port) or a child group (input port)
+ if (!processGroups.containsKey(sourceGroup.getIdentifier())) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because source is an Output Port that does not belong to a child Process Group");
+ }
+
+ if (isInputPort(destination)) {
+ if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group");
+ }
+ } else {
+ if (destinationGroup != this) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group");
+ }
+ }
+ } else { // source is not a port
+ if (sourceGroup != this) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group");
+ }
+
+ if (isOutputPort(destination)) {
+ if (destinationGroup != this) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Output Port but does not belong to this Process Group");
+ }
+ } else if (isInputPort(destination)) {
+ if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
+ throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port but the Input Port does not belong to a child Process Group");
+ }
+ } else if (destinationGroup != this) {
+ throw new IllegalStateException("Cannot add Connection between " + source + " and " + destination + " because they are in different Process Groups and neither is an Input Port or Output Port");
+ }
+ }
+
+ connection.setProcessGroup(this);
+ source.addConnection(connection);
+ if (source != destination) { // don't call addConnection twice if it's a self-looping connection.
+ destination.addConnection(connection);
+ }
+ connections.put(connection.getIdentifier(), connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Connectable getConnectable(final String id) {
+ readLock.lock();
+ try {
+ final ProcessorNode node = processors.get(id);
+ if (node != null) {
+ return node;
+ }
+
+ final Port inputPort = inputPorts.get(id);
+ if (inputPort != null) {
+ return inputPort;
+ }
+
+ final Port outputPort = outputPorts.get(id);
+ if (outputPort != null) {
+ return outputPort;
+ }
+
+ final Funnel funnel = funnels.get(id);
+ if (funnel != null) {
+ return funnel;
+ }
+
+ return null;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeConnection(final Connection connectionToRemove) {
+ writeLock.lock();
+ try {
+ // verify that Connection belongs to this group
+ final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier());
+ if (connection == null) {
+ throw new IllegalStateException(connectionToRemove + " is not a member of this Process Group");
+ }
+
+ connectionToRemove.verifyCanDelete();
+
+ final Connectable source = connectionToRemove.getSource();
+ final Connectable dest = connectionToRemove.getDestination();
+
+ // update the source & destination
+ source.removeConnection(connection);
+ if (source != dest) {
+ dest.removeConnection(connection);
+ }
+
+ // remove the connection from our map
+ connections.remove(connection.getIdentifier());
+ LOG.info("{} removed from flow", connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections() {
+ readLock.lock();
+ try {
+ return new HashSet<>(connections.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Connection getConnection(final String id) {
+ readLock.lock();
+ try {
+ return connections.get(Objects.requireNonNull(id));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Connection> findAllConnections() {
+ return findAllConnections(this);
+ }
+
+ private List<Connection> findAllConnections(final ProcessGroup group) {
+ final List<Connection> connections = new ArrayList<>(group.getConnections());
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ connections.addAll(findAllConnections(childGroup));
+ }
+ return connections;
+ }
+
+ @Override
+ public void addLabel(final Label label) {
+ writeLock.lock();
+ try {
+ final Label existing = labels.get(requireNonNull(label).getIdentifier());
+ if (existing != null) {
+ throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
+ }
+
+ label.setProcessGroup(this);
+ labels.put(label.getIdentifier(), label);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeLabel(final Label label) {
+ writeLock.lock();
+ try {
+ final Label removed = labels.remove(requireNonNull(label).getIdentifier());
+ if (removed == null) {
+ throw new IllegalStateException(label + " is not a member of this Process Group.");
+ }
+
+ LOG.info("Label with ID {} removed from flow", label.getIdentifier());
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Label> getLabels() {
+ readLock.lock();
+ try {
+ return new HashSet<>(labels.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Label getLabel(final String id) {
+ readLock.lock();
+ try {
+ return labels.get(id);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ readLock.lock();
+ try {
+ return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty()
+ && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public RemoteProcessGroup getRemoteProcessGroup(final String id) {
+ readLock.lock();
+ try {
+ return remoteGroups.get(Objects.requireNonNull(id));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void startProcessor(final ProcessorNode processor) {
+ readLock.lock();
+ try {
+ if (getProcessor(processor.getIdentifier()) == null) {
+ throw new IllegalStateException("Processor is not a member of this Process Group");
+ }
+
+ final ScheduledState state = processor.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("Processor is disabled");
+ } else if (state == ScheduledState.RUNNING) {
+ return;
+ }
+
+ scheduler.startProcessor(processor);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void startInputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (getInputPort(port.getIdentifier()) == null) {
+ throw new IllegalStateException(port + " is not a member of this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("InputPort " + port + " is disabled");
+ } else if (state == ScheduledState.RUNNING) {
+ return;
+ }
+
+ scheduler.startPort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void startOutputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (getOutputPort(port.getIdentifier()) == null) {
+ throw new IllegalStateException("Port is not a member of this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("OutputPort is disabled");
+ } else if (state == ScheduledState.RUNNING) {
+ return;
+ }
+
+ scheduler.startPort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void startFunnel(final Funnel funnel) {
+ readLock.lock();
+ try {
+ if (getFunnel(funnel.getIdentifier()) == null) {
+ throw new IllegalStateException("Funnel is not a member of this Process Group");
+ }
+
+ final ScheduledState state = funnel.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("Funnel is disabled");
+ } else if (state == ScheduledState.RUNNING) {
+ return;
+ }
+ scheduler.startFunnel(funnel);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void stopProcessor(final ProcessorNode processor) {
+ readLock.lock();
+ try {
+ if (!processors.containsKey(processor.getIdentifier())) {
+ throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = processor.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("Processor is disabled");
+ } else if (state == ScheduledState.STOPPED) {
+ return;
+ }
+
+ scheduler.stopProcessor(processor);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void stopInputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!inputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("InputPort is disabled");
+ } else if (state == ScheduledState.STOPPED) {
+ return;
+ }
+
+ scheduler.stopPort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void stopOutputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!outputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("OutputPort is disabled");
+ } else if (state == ScheduledState.STOPPED) {
+ return;
+ }
+
+ scheduler.stopPort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void stopFunnel(final Funnel funnel) {
+ readLock.lock();
+ try {
+ if (!funnels.containsKey(funnel.getIdentifier())) {
+ throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = funnel.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ throw new IllegalStateException("Funnel is disabled");
+ } else if (state == ScheduledState.STOPPED) {
+ return;
+ }
+
+ scheduler.stopFunnel(funnel);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void enableFunnel(final Funnel funnel) {
+ readLock.lock();
+ try {
+ if (!funnels.containsKey(funnel.getIdentifier())) {
+ throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = funnel.getScheduledState();
+ if (state == ScheduledState.STOPPED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("Funnel is currently running");
+ }
+
+ scheduler.enableFunnel(funnel);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void enableInputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!inputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.STOPPED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("InputPort is currently running");
+ }
+
+ scheduler.enablePort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void enableOutputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!outputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.STOPPED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("OutputPort is currently running");
+ }
+
+ scheduler.enablePort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void enableProcessor(final ProcessorNode processor) {
+ readLock.lock();
+ try {
+ if (!processors.containsKey(processor.getIdentifier())) {
+ throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = processor.getScheduledState();
+ if (state == ScheduledState.STOPPED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("Processor is currently running");
+ }
+
+ scheduler.enableProcessor(processor);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void disableFunnel(final Funnel funnel) {
+ readLock.lock();
+ try {
+ if (!funnels.containsKey(funnel.getIdentifier())) {
+ throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = funnel.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("Funnel is currently running");
+ }
+
+ scheduler.disableFunnel(funnel);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void disableInputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!inputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No InputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("InputPort is currently running");
+ }
+
+ scheduler.disablePort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void disableOutputPort(final Port port) {
+ readLock.lock();
+ try {
+ if (!outputPorts.containsKey(port.getIdentifier())) {
+ throw new IllegalStateException("No OutputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = port.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("OutputPort is currently running");
+ }
+
+ scheduler.disablePort(port);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void disableProcessor(final ProcessorNode processor) {
+ readLock.lock();
+ try {
+ if (!processors.containsKey(processor.getIdentifier())) {
+ throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
+ }
+
+ final ScheduledState state = processor.getScheduledState();
+ if (state == ScheduledState.DISABLED) {
+ return;
+ } else if (state == ScheduledState.RUNNING) {
+ throw new IllegalStateException("Processor is currently running");
+ }
+
+ scheduler.disableProcessor(processor);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof StandardProcessGroup) {
+ final StandardProcessGroup other = (StandardProcessGroup) obj;
+ return (getIdentifier().equals(other.getIdentifier()));
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(getIdentifier()).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", name).toString();
+ }
+
+ @Override
+ public ProcessGroup findProcessGroup(final String id) {
+ return findProcessGroup(requireNonNull(id), this);
+ }
+
+ private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) {
+ if (id.equals(start.getIdentifier())) {
+ return start;
+ }
+
+ for (final ProcessGroup group : start.getProcessGroups()) {
+ final ProcessGroup matching = findProcessGroup(id, group);
+ if (matching != null) {
+ return matching;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<RemoteProcessGroup> findAllRemoteProcessGroups() {
+ return findAllRemoteProcessGroups(this);
+ }
+
+ private List<RemoteProcessGroup> findAllRemoteProcessGroups(final ProcessGroup start) {
+ final List<RemoteProcessGroup> remoteGroups = new ArrayList<>(start.getRemoteProcessGroups());
+ for (final ProcessGroup childGroup : start.getProcessGroups()) {
+ remoteGroups.addAll(findAllRemoteProcessGroups(childGroup));
+ }
+ return remoteGroups;
+ }
+
+ @Override
+ public RemoteProcessGroup findRemoteProcessGroup(final String id) {
+ return findRemoteProcessGroup(requireNonNull(id), this);
+ }
+
+ private RemoteProcessGroup findRemoteProcessGroup(final String id, final ProcessGroup start) {
+ RemoteProcessGroup remoteGroup = start.getRemoteProcessGroup(id);
+ if (remoteGroup != null) {
+ return remoteGroup;
+ }
+
+ for (final ProcessGroup group : start.getProcessGroups()) {
+ remoteGroup = findRemoteProcessGroup(id, group);
+ if (remoteGroup != null) {
+ return remoteGroup;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public ProcessorNode findProcessor(final String id) {
+ return findProcessor(id, this);
+ }
+
+ private ProcessorNode findProcessor(final String id, final ProcessGroup start) {
+ ProcessorNode node = start.getProcessor(id);
+ if (node != null) {
+ return node;
+ }
+
+ for (final ProcessGroup group : start.getProcessGroups()) {
+ node = findProcessor(id, group);
+ if (node != null) {
+ return node;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<ProcessorNode> findAllProcessors() {
+ return findAllProcessors(this);
+ }
+
+ private List<ProcessorNode> findAllProcessors(final ProcessGroup start) {
+ final List<ProcessorNode> allNodes = new ArrayList<>(start.getProcessors());
+ for (final ProcessGroup group : start.getProcessGroups()) {
+ allNodes.addAll(findAllProcessors(group));
+ }
+ return allNodes;
+ }
+
+ public Connectable findConnectable(final String identifier) {
+ return findConnectable(identifier, this);
+ }
+
+ private static Connectable findConnectable(final String identifier, final ProcessGroup group) {
+ final ProcessorNode procNode = group.getProcessor(identifier);
+ if (procNode != null) {
+ return procNode;
+ }
+
+ final Port inPort = group.getInputPort(identifier);
+ if (inPort != null) {
+ return inPort;
+ }
+
+ final Port outPort = group.getOutputPort(identifier);
+ if (outPort != null) {
+ return outPort;
+ }
+
+ final Funnel funnel = group.getFunnel(identifier);
+ if (funnel != null) {
+ return funnel;
+ }
+
+ for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+ final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
+ if (remoteInPort != null) {
+ return remoteInPort;
+ }
+
+ final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
+ if (remoteOutPort != null) {
+ return remoteOutPort;
+ }
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ final Connectable childGroupConnectable = findConnectable(identifier, childGroup);
+ if (childGroupConnectable != null) {
+ return childGroupConnectable;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<Label> findAllLabels() {
+ return findAllLabels(this);
+ }
+
+ private List<Label> findAllLabels(final ProcessGroup start) {
+ final List<Label> allLabels = new ArrayList<>(start.getLabels());
+ for (final ProcessGroup group : start.getProcessGroups()) {
+ allLabels.addAll(findAllLabels(group));
+ }
+ return allLabels;
+ }
+
+ @Override
+ public Port findInputPort(final String id) {
+ return findPort(id, this, new InputPortRetriever());
+ }
+
+ @Override
+ public Port findOutputPort(final String id) {
+ return findPort(id, this, new OutputPortRetriever());
+ }
+
+ @Override
+ public Port getInputPortByName(final String name) {
+ return getPortByName(name, this, new InputPortRetriever());
+ }
+
+ @Override
+ public Port getOutputPortByName(final String name) {
+ return getPortByName(name, this, new OutputPortRetriever());
+ }
+
+ private interface PortRetriever {
+
+ Port getPort(ProcessGroup group, String id);
+
+ Set<Port> getPorts(ProcessGroup group);
+ }
+
+ private static class InputPortRetriever implements PortRetriever {
+
+ @Override
+ public Set<Port> getPorts(final ProcessGroup group) {
+ return group.getInputPorts();
+ }
+
+ @Override
+ public Port getPort(final ProcessGroup group, final String id) {
+ return group.getInputPort(id);
+ }
+ }
+
+ private static class OutputPortRetriever implements PortRetriever {
+
+ @Override
+ public Set<Port> getPorts(final ProcessGroup group) {
+ return group.getOutputPorts();
+ }
+
+ @Override
+ public Port getPort(final ProcessGroup group, final String id) {
+ return group.getOutputPort(id);
+ }
+ }
+
+ private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) {
+ Port port = retriever.getPort(group, id);
+ if (port != null) {
+ return port;
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ port = findPort(id, childGroup, retriever);
+ if (port != null) {
+ return port;
+ }
+ }
+
+ return null;
+ }
+
+ private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
+ for (final Port port : retriever.getPorts(group)) {
+ if (port.getName().equals(name)) {
+ return port;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void addFunnel(final Funnel funnel) {
+ writeLock.lock();
+ try {
+ final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
+ if (existing != null) {
+ throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
+ }
+
+ funnel.setProcessGroup(this);
+ funnels.put(funnel.getIdentifier(), funnel);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Funnel getFunnel(final String id) {
+ readLock.lock();
+ try {
+ return funnels.get(id);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeFunnel(final Funnel funnel) {
+ writeLock.lock();
+ try {
+ final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
+ if (existing == null) {
+ throw new IllegalStateException(funnel + " is not a member of this ProcessGroup");
+ }
+
+ funnel.verifyCanDelete();
+ for (final Connection conn : funnel.getConnections()) {
+ conn.verifyCanDelete();
+ }
+
+ stopFunnel(funnel);
+
+ // must copy to avoid a concurrent modification
+ final Set<Connection> copy = new HashSet<>(funnel.getConnections());
+ for (final Connection conn : copy) {
+ removeConnection(conn);
+ }
+
+ funnels.remove(funnel.getIdentifier());
+ LOG.info("{} removed from flow", funnel);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Funnel> getFunnels() {
+ readLock.lock();
+ try {
+ return new HashSet<>(funnels.values());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void remove(final Snippet snippet) {
+ writeLock.lock();
+ try {
+ // ensure that all components are valid
+ verifyContents(snippet);
+
+ final Set<Connectable> connectables = getAllConnectables(snippet);
+ final Set<String> connectionIdsToRemove = new HashSet<>(replaceNullWithEmptySet(snippet.getConnections()));
+ // Remove all connections that are the output of any Connectable.
+ for (final Connectable connectable : connectables) {
+ for (final Connection conn : connectable.getConnections()) {
+ if (!connections.containsKey(conn.getIdentifier())) {
+ throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections from the parent Process Group");
+ }
+ connectionIdsToRemove.add(conn.getIdentifier());
+ }
+ }
+
+ // verify that all connections can be removed
+ for (final String id : connectionIdsToRemove) {
+ connections.get(id).verifyCanDelete();
+ }
+
+ // verify that all processors are stopped and have no active threads
+ for (final String procId : snippet.getProcessors()) {
+ final ProcessorNode procNode = getProcessor(procId);
+ if (procNode.isRunning()) {
+ throw new IllegalStateException(procNode + " cannot be removed because it is running");
+ }
+ final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
+ if (activeThreadCount != 0) {
+ throw new IllegalStateException(procNode + " cannot be removed because it still has " + activeThreadCount + " active threads");
+ }
+ }
+
+ // verify that none of the connectables have incoming connections that are not in the Snippet.
+ final Set<String> connectionIds = snippet.getConnections();
+ for (final Connectable connectable : connectables) {
+ for (final Connection conn : connectable.getIncomingConnections()) {
+ if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
+ throw new IllegalStateException(connectable + " cannot be removed because it has incoming connections that are not selected to be deleted");
+ }
+ }
+ }
+
+ // verify that all of the ProcessGroups in the snippet are empty
+ for (final String groupId : snippet.getProcessGroups()) {
+ final ProcessGroup toRemove = getProcessGroup(groupId);
+ if (!toRemove.isEmpty()) {
+ throw new IllegalStateException("Process Group with name " + toRemove.getName() + " cannot be removed because it is not empty");
+ }
+ }
+
+ for (final String id : connectionIdsToRemove) {
+ removeConnection(connections.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+ removeInputPort(inputPorts.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+ removeOutputPort(outputPorts.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+ removeFunnel(funnels.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getLabels())) {
+ removeLabel(labels.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
+ removeProcessGroup(processGroups.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+ removeProcessor(processors.get(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+ removeRemoteProcessGroup(remoteGroups.get(id));
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private Set<String> replaceNullWithEmptySet(final Set<String> toReplace) {
+ return (toReplace == null) ? new HashSet<String>() : toReplace;
+ }
+
+ @Override
+ public void move(final Snippet snippet, final ProcessGroup destination) {
+ writeLock.lock();
+ try {
+ verifyContents(snippet);
+
+ if (!isDisconnected(snippet)) {
+ throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+ }
+
+ if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
+ throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group");
+ }
+
+ for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+ destination.addInputPort(inputPorts.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+ destination.addOutputPort(outputPorts.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+ destination.addFunnel(funnels.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getLabels())) {
+ destination.addLabel(labels.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getProcessGroups())) {
+ destination.addProcessGroup(processGroups.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+ destination.addProcessor(processors.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+ destination.addRemoteProcessGroup(remoteGroups.remove(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getConnections())) {
+ destination.inheritConnection(connections.remove(id));
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private Set<Connectable> getAllConnectables(final Snippet snippet) {
+ final Set<Connectable> connectables = new HashSet<>();
+ for (final String id : replaceNullWithEmptySet(snippet.getInputPorts())) {
+ connectables.add(getInputPort(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getOutputPorts())) {
+ connectables.add(getOutputPort(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getFunnels())) {
+ connectables.add(getFunnel(id));
+ }
+ for (final String id : replaceNullWithEmptySet(snippet.getProcessors())) {
+ connectables.add(getProcessor(id));
+ }
+ return connectables;
+ }
+
+ private boolean isDisconnected(final Snippet snippet) {
+ final Set<Connectable> connectables = getAllConnectables(snippet);
+
+ for (final String id : replaceNullWithEmptySet(snippet.getRemoteProcessGroups())) {
+ final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id);
+ connectables.addAll(remoteGroup.getInputPorts());
+ connectables.addAll(remoteGroup.getOutputPorts());
+ }
+
+ final Set<String> connectionIds = snippet.getConnections();
+ for (final Connectable connectable : connectables) {
+ for (final Connection conn : connectable.getIncomingConnections()) {
+ if (!connectionIds.contains(conn.getIdentifier())) {
+ return false;
+ }
+ }
+
+ for (final Connection conn : connectable.getConnections()) {
+ if (!connectionIds.contains(conn.getIdentifier())) {
+ return false;
+ }
+ }
+ }
+
+ final Set<Connectable> recursiveConnectables = new HashSet<>(connectables);
+ for (final String id : snippet.getProcessGroups()) {
+ final ProcessGroup childGroup = getProcessGroup(id);
+ recursiveConnectables.addAll(findAllConnectables(childGroup, true));
+ }
+
+ for (final String id : connectionIds) {
+ final Connection connection = getConnection(id);
+ if (!recursiveConnectables.contains(connection.getSource()) || !recursiveConnectables.contains(connection.getDestination())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) {
+ final Set<Connectable> set = new HashSet<>();
+ set.addAll(group.getInputPorts());
+ set.addAll(group.getOutputPorts());
+ set.addAll(group.getFunnels());
+ set.addAll(group.getProcessors());
+ if (includeRemotePorts) {
+ for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+ set.addAll(remoteGroup.getInputPorts());
+ set.addAll(remoteGroup.getOutputPorts());
+ }
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ set.addAll(findAllConnectables(childGroup, includeRemotePorts));
+ }
+
+ return set;
+ }
+
+ /**
+ * Verifies that all ID's defined within the given snippet reference
+ * components within this ProcessGroup. If this is not the case, throws
+ * {@link IllegalStateException}.
+ *
+ * @param snippet
+ * @throws NullPointerException if the argument is null
+ * @throws IllegalStateException if the snippet contains an ID that
+ * references a component that is not part of this ProcessGroup
+ */
+ private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
+ requireNonNull(snippet);
+
+ verifyAllKeysExist(snippet.getInputPorts(), inputPorts, "Input Port");
+ verifyAllKeysExist(snippet.getOutputPorts(), outputPorts, "Output Port");
+ verifyAllKeysExist(snippet.getFunnels(), funnels, "Funnel");
+ verifyAllKeysExist(snippet.getLabels(), labels, "Label");
+ verifyAllKeysExist(snippet.getProcessGroups(), processGroups, "Process Group");
+ verifyAllKeysExist(snippet.getProcessors(), processors, "Processor");
+ verifyAllKeysExist(snippet.getRemoteProcessGroups(), remoteGroups, "Remote Process Group");
+ verifyAllKeysExist(snippet.getConnections(), connections, "Connection");
+ }
+
+ /**
+ * <p>
+ * Verifies that all ID's specified by the given set exist as keys in the
+ * given Map. If any of the ID's does not exist as a key in the map, will
+ * throw {@link IllegalStateException} indicating the ID that is invalid and
+ * specifying the Component Type.
+ * </p>
+ *
+ * <p>
+ * If the ids given are null, will do no validation.
+ * </p>
+ *
+ * @param ids
+ * @param map
+ * @param componentType
+ */
+ private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) {
+ if (ids != null) {
+ for (final String id : ids) {
+ if (!map.containsKey(id)) {
+ throw new IllegalStateException("ID " + id + " does not refer to a(n) " + componentType + " in this ProcessGroup");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if (!isEmpty()) {
+ throw new IllegalStateException(this + " is not empty");
+ }
+ }
+
+ @Override
+ public void verifyCanStop() {
+ }
+
+ @Override
+ public void verifyCanStart() {
+ readLock.lock();
+ try {
+ for (final Connectable connectable : findAllConnectables(this, false)) {
+ if (connectable.getScheduledState() == ScheduledState.STOPPED) {
+ if (scheduler.getActiveThreadCount(connectable) > 0) {
+ throw new IllegalStateException("Cannot start " + connectable + " because it is currently stopping");
+ }
+
+ connectable.verifyCanStart();
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanDelete(final Snippet snippet) throws IllegalStateException {
+ readLock.lock();
+ try {
+ if (!id.equals(snippet.getParentGroupId())) {
+ throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
+ }
+
+ if (!isDisconnected(snippet)) {
+ throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+ }
+
+ for (final String id : snippet.getConnections()) {
+ final Connection connection = getConnection(id);
+ if (connection == null) {
+ throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+
+ connection.verifyCanDelete();
+ }
+
+ for (final String id : snippet.getFunnels()) {
+ final Funnel funnel = getFunnel(id);
+ if (funnel == null) {
+ throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+
+ funnel.verifyCanDelete(true);
+ }
+
+ for (final String id : snippet.getInputPorts()) {
+ final Port port = getInputPort(id);
+ if (port == null) {
+ throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+
+ port.verifyCanDelete(true);
+ }
+
+ for (final String id : snippet.getLabels()) {
+ final Label label = getLabel(id);
+ if (label == null) {
+ throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+ }
+
+ for (final String id : snippet.getOutputPorts()) {
+ final Port port = getOutputPort(id);
+ if (port == null) {
+ throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+ port.verifyCanDelete(true);
+ }
+
+ for (final String id : snippet.getProcessGroups()) {
+ final ProcessGroup group = getProcessGroup(id);
+ if (group == null) {
+ throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+ group.verifyCanDelete();
+ }
+
+ for (final String id : snippet.getProcessors()) {
+ final ProcessorNode processor = getProcessor(id);
+ if (processor == null) {
+ throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+ processor.verifyCanDelete(true);
+ }
+
+ for (final String id : snippet.getRemoteProcessGroups()) {
+ final RemoteProcessGroup group = getRemoteProcessGroup(id);
+ if (group == null) {
+ throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup");
+ }
+ group.verifyCanDelete(true);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) throws IllegalStateException {
+ readLock.lock();
+ try {
+ if (!id.equals(snippet.getParentGroupId())) {
+ throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
+ }
+
+ verifyContents(snippet);
+
+ if (!isDisconnected(snippet)) {
+ throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
+ }
+
+ if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) {
+ throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group");
+ }
+
+ for (final String id : snippet.getInputPorts()) {
+ final Port port = getInputPort(id);
+ final String portName = port.getName();
+
+ if (newProcessGroup.getInputPortByName(portName) != null) {
+ throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Input Port with the name " + portName);
+ }
+ }
+
+ for (final String id : snippet.getOutputPorts()) {
+ final Port port = getOutputPort(id);
+ final String portName = port.getName();
+
+ if (newProcessGroup.getOutputPortByName(portName) != null) {
+ throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Output Port with the name " + portName);
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 0000000,318901f..ac58504
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@@ -1,0 -1,113 +1,113 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processor;
+
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+
+ public class StandardSchedulingContext implements SchedulingContext {
+
+ private final ProcessContext processContext;
+ private final ControllerServiceProvider serviceProvider;
+ private final ProcessorNode processorNode;
+
+ public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) {
+ this.processContext = processContext;
+ this.serviceProvider = serviceProvider;
+ this.processorNode = processorNode;
+ }
+
+ @Override
+ public void leaseControllerService(final String identifier) {
+ final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier);
+ if (serviceNode == null) {
+ throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
+ }
+
+ if (serviceNode.isDisabled()) {
- throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
++ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
+ }
+
+ if (!serviceNode.isValid()) {
- throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
++ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
+ }
+
+ serviceNode.addReference(processorNode);
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return processContext.getProperty(descriptor);
+ }
+
+ @Override
+ public PropertyValue getProperty(final String propertyName) {
+ return processContext.getProperty(propertyName);
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(final String rawValue) {
+ return processContext.newPropertyValue(rawValue);
+ }
+
+ @Override
+ public void yield() {
+ processContext.yield();
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return processContext.getMaxConcurrentTasks();
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return processContext.getAnnotationData();
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ return processContext.getProperties();
+ }
+
+ @Override
+ public String encrypt(final String unencrypted) {
+ return processContext.encrypt(unencrypted);
+ }
+
+ @Override
+ public String decrypt(final String encrypted) {
+ return processContext.decrypt(encrypted);
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return processContext.getControllerServiceLookup();
+ }
+
+ @Override
+ public Set<Relationship> getAvailableRelationships() {
+ return processContext.getAvailableRelationships();
+ }
+ }