You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:30 UTC

[07/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
new file mode 100644
index 0000000..e434905
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+public interface RepositoryStatusReport {
+
+    void addReportEntry(FlowFileEvent entry);
+
+    Map<String, FlowFileEvent> getReportEntries();
+
+    FlowFileEvent getReportEntry(String componentId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
new file mode 100644
index 0000000..6f9c237
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.controller.Availability;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ControllerService;
+
+public interface ControllerServiceNode extends ConfiguredComponent {
+
+    ControllerService getControllerService();
+
+    Availability getAvailability();
+
+    void setAvailability(Availability availability);
+
+    boolean isDisabled();
+
+    void setDisabled(boolean disabled);
+
+    ControllerServiceReference getReferences();
+
+    void addReference(ConfiguredComponent referringComponent);
+
+    void removeReference(ConfiguredComponent referringComponent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
new file mode 100644
index 0000000..35a255d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerServiceLookup;
+
+/**
+ *
+ */
+public interface ControllerServiceProvider extends ControllerServiceLookup {
+
+    /**
+     * Gets the controller service for the specified identifier. Returns null if
+     * the identifier does not match a known service.
+     *
+     * @param type
+     * @param id
+     * @param properties
+     * @return
+     */
+    ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties);
+
+    /**
+     * Gets the controller service node for the specified identifier. Returns
+     * <code>null</code> if the identifier does not match a known service
+     *
+     * @param id
+     * @return
+     */
+    ControllerServiceNode getControllerServiceNode(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
new file mode 100644
index 0000000..5cb676f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import java.util.Set;
+
+import org.apache.nifi.controller.ConfiguredComponent;
+
+/**
+ * Provides a collection of components that are referencing a Controller Service
+ */
+public interface ControllerServiceReference {
+
+    /**
+     * Returns the component that is being referenced
+     *
+     * @return
+     */
+    ControllerServiceNode getReferencedComponent();
+
+    /**
+     * Returns a {@link Set} of all components that are referencing this
+     * Controller Service
+     *
+     * @return
+     */
+    Set<ConfiguredComponent> getReferencingComponents();
+
+    /**
+     * Returns a {@link Set} of all Processors and Reporting Tasks that are
+     * referencing the Controller Service and are running, in addition to all
+     *
+     * @return
+     */
+    Set<ConfiguredComponent> getRunningReferences();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
new file mode 100644
index 0000000..d1d5e5b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public final class BulletinFactory {
+
+    private static final AtomicLong currentId = new AtomicLong(0);
+
+    public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
+        return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+    }
+
+    public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+        final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
+        bulletin.setGroupId(groupId);
+        bulletin.setSourceId(sourceId);
+        bulletin.setSourceName(sourceName);
+        bulletin.setCategory(category);
+        bulletin.setLevel(severity);
+        bulletin.setMessage(message);
+        return bulletin;
+    }
+
+    public static Bulletin createBulletin(final String category, final String severity, final String message) {
+        final Bulletin bulletin = new SystemBulletin(currentId.getAndIncrement());
+        bulletin.setCategory(category);
+        bulletin.setLevel(severity);
+        bulletin.setMessage(message);
+        return bulletin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
new file mode 100644
index 0000000..9846cf2
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public interface BulletinProcessingStrategy {
+
+    void update(Bulletin bulletin);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
new file mode 100644
index 0000000..23c4cdb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public class ComponentBulletin extends Bulletin {
+
+    ComponentBulletin(final long id) {
+        super(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
new file mode 100644
index 0000000..f97dc46
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public class SystemBulletin extends Bulletin {
+
+    SystemBulletin(final long id) {
+        super(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
new file mode 100644
index 0000000..61be59c
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Processor;
+
+/**
+ * <p>
+ * ProcessGroup objects are containers for processing entities, such as
+ * {@link Processor}s, {@link Port}s, and other {@link ProcessGroup}s.
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE</p>
+ */
+public interface ProcessGroup {
+
+    /**
+     * @return a reference to this ProcessGroup's parent. This will be
+     * <tt>null</tt> if and only if this is the root group.
+     */
+    ProcessGroup getParent();
+
+    /**
+     * Updates the ProcessGroup to point to a new parent
+     *
+     * @param group
+     */
+    void setParent(ProcessGroup group);
+
+    /**
+     * @return the ID of the ProcessGroup
+     */
+    String getIdentifier();
+
+    /**
+     * @return the name of the ProcessGroup
+     */
+    String getName();
+
+    /**
+     * Updates the name of this ProcessGroup.
+     *
+     * @param name
+     */
+    void setName(String name);
+
+    /**
+     * Updates the position of where this ProcessGroup is located in the graph
+     */
+    void setPosition(Position position);
+
+    /**
+     * Returns the position of where this ProcessGroup is located in the graph
+     *
+     * @return
+     */
+    Position getPosition();
+
+    /**
+     * @return the user-set comments about this ProcessGroup, or
+     * <code>null</code> if no comments have been set
+     */
+    String getComments();
+
+    /**
+     * Updates the comments for this ProcessGroup
+     *
+     * @param comments
+     */
+    void setComments(String comments);
+
+    /**
+     * Returns the counts for this ProcessGroup
+     *
+     * @return
+     */
+    ProcessGroupCounts getCounts();
+
+    /**
+     * Starts all Processors, Local Ports, and Funnels that are directly within
+     * this group and any child ProcessGroups, except for those that are
+     * disabled.
+     */
+    void startProcessing();
+
+    /**
+     * Stops all Processors, Local Ports, and Funnels that are directly within
+     * this group and child ProcessGroups, except for those that are disabled.
+     */
+    void stopProcessing();
+
+    /**
+     * Starts the given Processor
+     *
+     * @param processor the processor to start
+     * @throws IllegalStateException if the processor is not valid, or is
+     * already running
+     */
+    void enableProcessor(ProcessorNode processor);
+
+    /**
+     * Starts the given Input Port
+     *
+     * @param port
+     */
+    void enableInputPort(Port port);
+
+    /**
+     * Starts the given Output Port
+     *
+     * @param port
+     */
+    void enableOutputPort(Port port);
+
+    /**
+     * Starts the given Funnel
+     *
+     * @param funnel
+     */
+    void enableFunnel(Funnel funnel);
+
+    /**
+     * Starts the given Processor
+     *
+     * @param processor the processor to start
+     * @throws IllegalStateException if the processor is not valid, or is
+     * already running
+     */
+    void startProcessor(ProcessorNode processor);
+
+    /**
+     * Starts the given Input Port
+     *
+     * @param port
+     */
+    void startInputPort(Port port);
+
+    /**
+     * Starts the given Output Port
+     *
+     * @param port
+     */
+    void startOutputPort(Port port);
+
+    /**
+     * Starts the given Funnel
+     *
+     * @param funnel
+     */
+    void startFunnel(Funnel funnel);
+
+    /**
+     * Stops the given Processor
+     *
+     * @param processor
+     */
+    void stopProcessor(ProcessorNode processor);
+
+    /**
+     * Stops the given Port
+     *
+     * @param processor
+     */
+    void stopInputPort(Port port);
+
+    /**
+     * Stops the given Port
+     *
+     * @param processor
+     */
+    void stopOutputPort(Port port);
+
+    /**
+     * Stops the given Funnel
+     *
+     * @param processor
+     */
+    void stopFunnel(Funnel funnel);
+
+    /**
+     * Starts the given Processor
+     *
+     * @param processor the processor to start
+     * @throws IllegalStateException if the processor is not valid, or is
+     * already running
+     */
+    void disableProcessor(ProcessorNode processor);
+
+    /**
+     * Starts the given Input Port
+     *
+     * @param port
+     */
+    void disableInputPort(Port port);
+
+    /**
+     * Starts the given Output Port
+     *
+     * @param port
+     */
+    void disableOutputPort(Port port);
+
+    /**
+     * Starts the given Funnel
+     *
+     * @param funnel
+     */
+    void disableFunnel(Funnel funnel);
+
+    /**
+     * Indicates that the Flow is being shutdown; allows cleanup of resources
+     * associated with processors, etc.
+     */
+    void shutdown();
+
+    /**
+     * Returns a boolean indicating whether or not this ProcessGroup is the root
+     * group
+     *
+     * @return
+     */
+    boolean isRootGroup();
+
+    /**
+     * Adds a {@link Port} to be used for transferring {@link FlowFile}s from
+     * external sources to {@link Processor}s and other {@link Port}s within
+     * this ProcessGroup.
+     *
+     * @param port
+     */
+    void addInputPort(Port port);
+
+    /**
+     * Removes a {@link Port} from this ProcessGroup's list of Input Ports.
+     *
+     * @param port the Port to remove
+     * @throws NullPointerException if <code>port</code> is null
+     * @throws IllegalStateException if port is not an Input Port for this
+     * ProcessGroup
+     */
+    void removeInputPort(Port port);
+
+    /**
+     * @return the {@link Set} of all {@link Port}s that are used by this
+     * ProcessGroup as Input Ports.
+     */
+    Set<Port> getInputPorts();
+
+    /**
+     * @param id the ID of the input port
+     * @return the input port with the given ID, or <code>null</code> if it does
+     * not exist.
+     */
+    Port getInputPort(String id);
+
+    /**
+     * Adds a {@link Port} to be used for transferring {@link FlowFile}s to
+     * external sources.
+     *
+     * @param port the Port to add
+     */
+    void addOutputPort(Port port);
+
+    /**
+     * Removes a {@link Port} from this ProcessGroup's list of Output Ports.
+     *
+     * @param port the Port to remove
+     * @throws NullPointerException if <code>port</code> is null
+     * @throws IllegalStateException if port is not an Input Port for this
+     * ProcessGroup
+     */
+    void removeOutputPort(Port port);
+
+    /**
+     * @param id the ID of the output port
+     * @return the output port with the given ID, or <code>null</code> if it
+     * does not exist.
+     */
+    Port getOutputPort(String id);
+
+    /**
+     * @return the {@link Set} of all {@link Port}s that are used by this
+     * ProcessGroup as Output Ports.
+     */
+    Set<Port> getOutputPorts();
+
+    /**
+     * Adds a reference to a ProgressGroup as a child of this.
+     *
+     * @return the newly created reference
+     */
+    void addProcessGroup(ProcessGroup group);
+
+    /**
+     * Returns the ProcessGroup whose parent is <code>this</code> and whose id
+     * is given
+     *
+     * @param id
+     * @return
+     */
+    ProcessGroup getProcessGroup(String id);
+
+    /**
+     * @return a {@link Set} of all Process Group References that are contained
+     * within this.
+     */
+    Set<ProcessGroup> getProcessGroups();
+
+    /**
+     * @param group the group to remove
+     * @throws NullPointerException if <code>group</code> is null
+     * @throws IllegalStateException if group is not member of this
+     * ProcessGroup, or the given ProcessGroup is not empty (i.e., it contains
+     * at least one Processor, ProcessGroup, Input Port, Output Port, or Label).
+     */
+    void removeProcessGroup(ProcessGroup group);
+
+    /**
+     * Adds the already constructed processor instance to this group
+     *
+     * @param processor the processor to add
+     */
+    void addProcessor(ProcessorNode processor);
+
+    /**
+     * Removes the given processor from this group, destroying the Processor.
+     * The Processor is removed from the ProcessorRegistry, and any method in
+     * the Processor that is annotated with the
+     * {@link nifi.processor.annotation.OnRemoved OnRemoved} annotation will be
+     * invoked. All outgoing connections will also be destroyed
+     *
+     * @param processor the Processor to remove
+     * @throws NullPointerException if <code>processor</code> is null
+     * @throws IllegalStateException if <code>processor</code> is not a member
+     * of this ProcessGroup, is currently running, or has any incoming
+     * connections.
+     */
+    void removeProcessor(ProcessorNode processor);
+
+    /**
+     * @return a {@link Collection} of all FlowFileProcessors that are contained
+     * within this.
+     */
+    Set<ProcessorNode> getProcessors();
+
+    /**
+     * Returns the FlowFileProcessor with the given ID.
+     *
+     * @param id the ID of the processor to retrieve
+     * @return the processor with the given ID
+     * @throws NullPointerException if <code>id</code> is null.
+     */
+    ProcessorNode getProcessor(String id);
+
+    /**
+     * Returns the <code>Connectable</code> with the given ID, or
+     * <code>null</code> if the <code>Connectable</code> is not a member of the
+     * group
+     *
+     * @param id the ID of the Connectable
+     * @return
+     */
+    Connectable getConnectable(String id);
+
+    /**
+     * Adds the given connection to this ProcessGroup. This method also notifies
+     * the Source and Destination of the Connection that the Connection has been
+     * established.
+     *
+     * @param connection
+     * @throws NullPointerException if the connection is null
+     * @throws IllegalStateException if the source or destination of the
+     * connection is not a member of this ProcessGroup or if a connection
+     * already exists in this ProcessGroup with the same ID
+     */
+    void addConnection(Connection connection);
+
+    /**
+     * Removes the connection from this ProcessGroup.
+     *
+     * @param connection
+     * @throws IllegalStateException if <code>connection</code> is not contained
+     * within this.
+     */
+    void removeConnection(Connection connection);
+
+    /**
+     * Inherits a Connection from another ProcessGroup; this does not perform
+     * any validation but simply notifies the ProcessGroup that it is now the
+     * owner of the given Connection. This is used in place of the
+     * {@link #addConnection(Connection)} method when moving Connections from
+     * one group to another because addConnection notifies both the Source and
+     * Destination of the Connection that the Connection has been established;
+     * this method does not notify either, as both the Source and Destination
+     * should already be aware of the Connection.
+     *
+     * @param connection
+     */
+    void inheritConnection(Connection connection);
+
+    /**
+     * @return the Connection with the given ID, or <code>null</code> if the
+     * connection does not exist.
+     */
+    Connection getConnection(String id);
+
+    /**
+     * Returns the {@link Set} of all {@link Connection}s contained within this.
+     *
+     * @return
+     */
+    Set<Connection> getConnections();
+
+    /**
+     * Returns a List of all Connections contains within this ProcessGroup and
+     * any child ProcessGroups.
+     *
+     * @return
+     */
+    List<Connection> findAllConnections();
+
+    /**
+     * Adds the given RemoteProcessGroup to this ProcessGroup
+     *
+     * @param remoteGroup
+     *
+     * @throws NullPointerException if the given argument is null
+     */
+    void addRemoteProcessGroup(RemoteProcessGroup remoteGroup);
+
+    /**
+     * Removes the given RemoteProcessGroup from this ProcessGroup
+     *
+     * @param remoteGroup
+     * @throws NullPointerException if the argument is null
+     * @throws IllegalStateException if the given argument does not belong to
+     * this ProcessGroup
+     */
+    void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup);
+
+    /**
+     * Returns the RemoteProcessGroup that is the child of this ProcessGroup and
+     * has the given ID. If no RemoteProcessGroup can be found with the given
+     * ID, returns <code>null</code>.
+     *
+     * @param id
+     * @return
+     */
+    RemoteProcessGroup getRemoteProcessGroup(String id);
+
+    /**
+     * Returns a set of all RemoteProcessGroups that belong to this
+     * ProcessGroup. If no RemoteProcessGroup's have been added to this
+     * ProcessGroup, will return an empty Set.
+     *
+     * @return
+     */
+    Set<RemoteProcessGroup> getRemoteProcessGroups();
+
+    /**
+     * Adds the given Label to this ProcessGroup
+     *
+     * @param label the label to add
+     * @return
+     *
+     * @throws NullPointerException if the argument is null
+     */
+    void addLabel(Label label);
+
+    /**
+     * Removes the given Label from this ProcessGroup
+     *
+     * @param label the label to remove
+     * @throws NullPointerException if the argument is null
+     * @throws IllegalStateException if the given argument does not belong to
+     * this ProcessGroup
+     */
+    void removeLabel(Label label);
+
+    /**
+     * Returns a set of all Labels that belong to this ProcessGroup. If no
+     * Labels belong to this ProcessGroup, returns an empty Set.
+     *
+     * @return
+     */
+    Set<Label> getLabels();
+
+    /**
+     * Returns the Label that belongs to this ProcessGroup and has the given id.
+     * If no Label can be found with this ID, returns <code>null</code>.
+     *
+     * @param id
+     * @return
+     */
+    Label getLabel(String id);
+
+    /**
+     * Returns the Process Group with the given ID, if it exists as a child of
+     * this ProcessGroup, or is this ProcessGroup. This performs a recursive
+     * search of all ProcessGroups and descendant ProcessGroups
+     *
+     * @param id
+     * @return
+     */
+    ProcessGroup findProcessGroup(String id);
+
+    /**
+     * Returns the RemoteProcessGroup with the given ID, if it exists as a child
+     * or descendant of this ProcessGroup. This performs a recursive search of
+     * all ProcessGroups and descendant ProcessGroups
+     *
+     * @param id
+     * @return
+     */
+    RemoteProcessGroup findRemoteProcessGroup(String id);
+
+    /**
+     * Returns a List of all Remote Process Groups that are children or
+     * descendants of this ProcessGroup. This performs a recursive search of all
+     * descendant ProcessGroups
+     *
+     * @return
+     */
+    List<RemoteProcessGroup> findAllRemoteProcessGroups();
+
+    /**
+     * Returns the Processor with the given ID, if it exists as a child or
+     * descendant of this ProcessGroup. This performs a recursive search of all
+     * descendant ProcessGroups
+     *
+     * @param id
+     * @return
+     */
+    ProcessorNode findProcessor(String id);
+
+    /**
+     * Returns a List of all Processors that are children or descendants of this
+     * ProcessGroup. This performs a recursive search of all descendant
+     * ProcessGroups
+     *
+     * @return
+     */
+    List<ProcessorNode> findAllProcessors();
+
+    /**
+     * Returns a List of all Labels that are children or descendants of this
+     * ProcessGroup. This performsn a recursive search of all descendant
+     * ProcessGroups
+     *
+     * @return
+     */
+    List<Label> findAllLabels();
+
+    /**
+     * Returns the input port with the given ID, if it exists; otherwise returns
+     * null. This performs a recursive search of all Input Ports and descendant
+     * ProcessGroups
+     *
+     * @param id
+     * @return
+     */
+    Port findInputPort(String id);
+
+    /**
+     * Returns the input port with the given name, if it exists; otherwise
+     * returns null. ProcessGroups
+     *
+     * @param name
+     * @return
+     */
+    Port getInputPortByName(String name);
+
+    /**
+     * Returns the output port with the given ID, if it exists; otherwise
+     * returns null. This performs a recursive search of all Output Ports and
+     * descendant ProcessGroups
+     *
+     * @param id
+     * @return
+     */
+    Port findOutputPort(String id);
+
+    /**
+     * Returns the output port with the given name, if it exists; otherwise
+     * returns null.
+     *
+     * @param name
+     * @return
+     */
+    Port getOutputPortByName(String name);
+
+    /**
+     * Adds the given funnel to this ProcessGroup
+     *
+     * @param funnel
+     */
+    void addFunnel(Funnel funnel);
+
+    /**
+     * Returns a Set of all Funnels that belong to this ProcessGroup
+     *
+     * @return
+     */
+    Set<Funnel> getFunnels();
+
+    /**
+     * Returns the funnel with the given identifier
+     *
+     * @param id
+     * @return
+     */
+    Funnel getFunnel(String id);
+
+    /**
+     * Removes the given funnel from this ProcessGroup
+     *
+     * @param funnel
+     *
+     * @throws IllegalStateException if the funnel is not a member of this
+     * ProcessGroup or has incoming or outgoing connections
+     */
+    void removeFunnel(Funnel funnel);
+
+    /**
+     * @return <code>true</code> if this ProcessGroup has no Processors, Labels,
+     * Connections, ProcessGroups, RemoteProcessGroupReferences, or Ports.
+     * Otherwise, returns <code>false</code>.
+     */
+    boolean isEmpty();
+
+    /**
+     * Removes all of the components whose ID's are specified within the given
+     * {@link Snippet} from this ProcessGroup.
+     *
+     * @param snippet
+     *
+     * @throws NullPointerException if argument is null
+     * @throws IllegalStateException if any ID in the snippet refers to a
+     * component that is not within this ProcessGroup
+     */
+    void remove(final Snippet snippet);
+
+    /**
+     * Returns the Connectable with the given ID, if it exists; otherwise
+     * returns null. This performs a recursive search of all ProcessGroups'
+     * input ports, output ports, funnels, processors, and remote process groups
+     *
+     * @param identifier
+     * @return
+     */
+    Connectable findConnectable(String identifier);
+
+    /**
+     * Moves all of the components whose ID's are specified within the given
+     * {@link Snippet} from this ProcessGroup into the given destination
+     * ProcessGroup
+     *
+     * @param snippet
+     * @param destination
+     *
+     * @throws NullPointerExcepiton if either argument is null
+     * @throws IllegalStateException if any ID in the snippet refers to a
+     * component that is not within this ProcessGroup
+     */
+    void move(final Snippet snippet, final ProcessGroup destination);
+
+    void verifyCanDelete();
+
+    void verifyCanStart();
+
+    void verifyCanStop();
+
+    /**
+     * Ensures that deleting the given snippet is a valid operation at this
+     * point in time, depending on the state of this ProcessGroup
+     *
+     * @param snippet
+     *
+     * @throws IllegalStateException if deleting the Snippet is not valid at
+     * this time
+     */
+    void verifyCanDelete(Snippet snippet);
+
+    /**
+     * Ensure that moving the given snippet to the given new group is a valid
+     * operation at this point in time, depending on the state of both
+     * ProcessGroups
+     *
+     * @param snippet
+     * @param newProcessGroup
+     *
+     * @throws IllegalStateException if the move is not valid at this time
+     */
+    void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
new file mode 100644
index 0000000..3eb594b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+public class ProcessGroupCounts {
+
+    private final int inputPortCount, outputPortCount, runningCount, stoppedCount, invalidCount, disabledCount, activeRemotePortCount, inactiveRemotePortCount;
+
+    public ProcessGroupCounts(final int inputPortCount, final int outputPortCount, final int runningCount,
+            final int stoppedCount, final int invalidCount, final int disabledCount, final int activeRemotePortCount, final int inactiveRemotePortCount) {
+        this.inputPortCount = inputPortCount;
+        this.outputPortCount = outputPortCount;
+        this.runningCount = runningCount;
+        this.stoppedCount = stoppedCount;
+        this.invalidCount = invalidCount;
+        this.disabledCount = disabledCount;
+        this.activeRemotePortCount = activeRemotePortCount;
+        this.inactiveRemotePortCount = inactiveRemotePortCount;
+    }
+
+    public int getInputPortCount() {
+        return inputPortCount;
+    }
+
+    public int getOutputPortCount() {
+        return outputPortCount;
+    }
+
+    public int getRunningCount() {
+        return runningCount;
+    }
+
+    public int getStoppedCount() {
+        return stoppedCount;
+    }
+
+    public int getInvalidCount() {
+        return invalidCount;
+    }
+
+    public int getDisabledCount() {
+        return disabledCount;
+    }
+
+    public int getActiveRemotePortCount() {
+        return activeRemotePortCount;
+    }
+
+    public int getInactiveRemotePortCount() {
+        return inactiveRemotePortCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
new file mode 100644
index 0000000..3acd1d3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.exception.CommunicationsException;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public interface RemoteProcessGroup {
+
+    String getIdentifier();
+
+    URI getTargetUri();
+
+    ProcessGroup getProcessGroup();
+
+    void setProcessGroup(ProcessGroup group);
+
+    void setPosition(Position position);
+
+    Position getPosition();
+
+    String getComments();
+
+    void setComments(String comments);
+
+    /**
+     * Returns the name of this RemoteProcessGroup. The value returned will
+     * never be null. If unable to communicate with the remote instance, the URI
+     * of that instance may be returned instead
+     *
+     * @return
+     */
+    String getName();
+
+    void setName(String name);
+
+    void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+
+    void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+
+    Set<RemoteGroupPort> getInputPorts();
+
+    Set<RemoteGroupPort> getOutputPorts();
+
+    RemoteGroupPort getInputPort(String id);
+
+    RemoteGroupPort getOutputPort(String id);
+
+    ProcessGroupCounts getCounts();
+
+    void refreshFlowContents() throws CommunicationsException;
+
+    Date getLastRefreshTime();
+
+    void setYieldDuration(final String yieldDuration);
+
+    String getYieldDuration();
+
+    /**
+     * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
+     *
+     * @param timePeriod
+     * @throws IllegalArgumentException
+     */
+    void setCommunicationsTimeout(String timePeriod) throws IllegalArgumentException;
+
+    /**
+     * Returns the communications timeout in terms of the given TimeUnit
+     *
+     * @param timeUnit
+     * @return
+     */
+    int getCommunicationsTimeout(TimeUnit timeUnit);
+
+    /**
+     * Returns the user-configured String representation of the communications
+     * timeout
+     *
+     * @return
+     */
+    String getCommunicationsTimeout();
+
+    /**
+     * @return the port that the remote instance is listening on for
+     * site-to-site communication, or <code>null</code> if the remote instance
+     * is not configured to allow site-to-site communications.
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    Integer getListeningPort() throws IOException;
+
+    /**
+     * Indicates whether or not the RemoteProcessGroup is currently scheduled to
+     * transmit data
+     *
+     * @return
+     */
+    boolean isTransmitting();
+
+    /**
+     * Initiates communications between this instance and the remote instance.
+     */
+    void startTransmitting();
+
+    /**
+     * Immediately terminates communications between this instance and the
+     * remote instance.
+     */
+    void stopTransmitting();
+
+    /**
+     * Initiates communications between this instance and the remote instance
+     * only for the port specified.
+     *
+     * @param port
+     */
+    void startTransmitting(RemoteGroupPort port);
+
+    /**
+     * Immediately terminates communications between this instance and the
+     * remote instance only for the port specified.
+     *
+     * @param port
+     */
+    void stopTransmitting(RemoteGroupPort port);
+
+    /**
+     * Indicates whether or not communications with this RemoteProcessGroup will
+     * be secure (2-way authentication)
+     *
+     * @return
+     */
+    boolean isSecure() throws CommunicationsException;
+
+    /**
+     * Indicates whether or not communications with this RemoteProcessGroup will
+     * be secure (2-way authentication). Returns null if unknown.
+     *
+     * @return
+     */
+    Boolean getSecureFlag();
+
+    /**
+     * Returns true if the target system has site to site enabled. Returns false
+     * otherwise (they don't or they have not yet responded).
+     *
+     * @return
+     */
+    boolean isSiteToSiteEnabled();
+
+    /**
+     * Returns a String indicating why we are not authorized to communicate with
+     * the remote instance, or <code>null</code> if we are authorized
+     *
+     * @return
+     */
+    String getAuthorizationIssue();
+
+    /**
+     * Returns the {@link EventReporter} that can be used to report any notable
+     * events
+     *
+     * @return
+     */
+    EventReporter getEventReporter();
+
+    /**
+     * Initiates a task in the remote process group to re-initialize, as a
+     * result of clustering changes
+     *
+     * @param isClustered whether or not this instance is now clustered
+     */
+    void reinitialize(boolean isClustered);
+
+    /**
+     * Removes all non existent ports from this RemoteProcessGroup.
+     */
+    void removeAllNonExistentPorts();
+
+    /**
+     * Removes a port that no longer exists on the remote instance from this
+     * RemoteProcessGroup
+     *
+     * @param port
+     */
+    void removeNonExistentPort(final RemoteGroupPort port);
+
+    /**
+     *
+     * @return @throws IOException
+     */
+    CommunicationsSession establishSiteToSiteConnection() throws IOException;
+
+    /**
+     * Called whenever RemoteProcessGroup is removed from the flow, so that any
+     * resources can be cleaned up appropriately.
+     */
+    void onRemove();
+
+    void verifyCanDelete();
+
+    void verifyCanDelete(boolean ignoreConnections);
+
+    void verifyCanStartTransmitting();
+
+    void verifyCanStopTransmitting();
+
+    void verifyCanUpdate();
+
+    /**
+     * Returns a set of PeerStatus objects that describe the different peers
+     * that we can communicate with for this RemoteProcessGroup.
+     *
+     * If the destination is a cluster, this set will contain PeerStatuses for
+     * each of the nodes in the cluster.
+     *
+     * If the destination is a standalone instance, this set will contain just a
+     * PeerStatus for the destination.
+     *
+     * Once the PeerStatuses have been obtained, they may be cached by this
+     * RemoteProcessGroup for some amount of time.
+     *
+     * If unable to obtain the PeerStatuses or no peer status has yet been
+     * obtained, will return null.
+     *
+     * @return
+     */
+    Set<PeerStatus> getPeerStatuses();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
new file mode 100644
index 0000000..fb4f6e0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+public interface RemoteProcessGroupPortDescriptor {
+
+    /**
+     * The comments as configured in the target port.
+     *
+     * @return
+     */
+    String getComments();
+
+    /**
+     * The number tasks that may transmit flow files to the target port
+     * concurrently.
+     *
+     * @return
+     */
+    Integer getConcurrentlySchedulableTaskCount();
+
+    /**
+     * The id of the target port.
+     *
+     * @return
+     */
+    String getId();
+
+    /**
+     * The id of the remote process group that this port resides in.
+     *
+     * @return
+     */
+    String getGroupId();
+
+    /**
+     * The name of the target port.
+     *
+     * @return
+     */
+    String getName();
+
+    /**
+     * Whether or not this remote group port is configured for transmission.
+     *
+     * @return
+     */
+    Boolean isTransmitting();
+
+    /**
+     * Whether or not flow file are compressed when sent to this target port.
+     *
+     * @return
+     */
+    Boolean getUseCompression();
+
+    /**
+     * Whether ot not the target port exists.
+     *
+     * @return
+     */
+    Boolean getExists();
+
+    /**
+     * Whether or not the target port is running.
+     *
+     * @return
+     */
+    Boolean isTargetRunning();
+
+    /**
+     * Whether or not this port has either an incoming or outgoing connection.
+     *
+     * @return
+     */
+    Boolean isConnected();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
new file mode 100644
index 0000000..27cc6c5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Date;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+public class LogMessage {
+
+    private final String message;
+    private final LogLevel level;
+    private final Throwable throwable;
+    private final long time;
+
+    public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+    public static final String TO_STRING_FORMAT = "%1$s %2$s - %3$s";
+
+    public LogMessage(final long millisSinceEpoch, final LogLevel level, final String message, final Throwable throwable) {
+        this.level = level;
+        this.throwable = throwable;
+        this.message = message;
+        this.time = millisSinceEpoch;
+    }
+
+    public long getMillisSinceEpoch() {
+        return time;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public LogLevel getLevel() {
+        return level;
+    }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
+
+    @Override
+    public String toString() {
+        final DateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
+        final String formattedTime = dateFormat.format(new Date(time));
+
+        String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, level.toString(), message);
+        if (throwable != null) {
+            final StringWriter sw = new StringWriter();
+            final PrintWriter pw = new PrintWriter(sw);
+            throwable.printStackTrace(pw);
+            formattedMsg += "\n" + sw.toString();
+        }
+
+        return formattedMsg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
new file mode 100644
index 0000000..a75f8ea
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+public interface LogObserver {
+
+    void onLogMessage(LogMessage message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
new file mode 100644
index 0000000..4a017ce
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+public interface LogRepository {
+
+    void addLogMessage(LogLevel level, String message);
+
+    void addLogMessage(LogLevel level, String message, Throwable t);
+
+    void addLogMessage(LogLevel level, String messageFormat, Object[] params);
+
+    void addLogMessage(LogLevel level, String messageFormat, Object[] params, Throwable t);
+
+    /**
+     * Registers an observer so that it will be notified of all Log Messages
+     * whose levels are at least equal to the given level.
+     *
+     * @param observerIdentifier
+     * @param level
+     * @param observer
+     */
+    void addObserver(String observerIdentifier, LogLevel level, LogObserver observer);
+
+    /**
+     * Sets the observation level of the specified observer.
+     *
+     * @param observerIdentifier
+     * @param level
+     */
+    void setObservationLevel(String observerIdentifier, LogLevel level);
+
+    /**
+     * Gets the observation level for the specified observer.
+     *
+     * @param observerIdentifier
+     * @return
+     */
+    LogLevel getObservationLevel(String observerIdentifier);
+
+    /**
+     * Removes the given LogObserver from this Repository.
+     *
+     * @param observerIdentifier
+     * @return 
+     */
+    LogObserver removeObserver(String observerIdentifier);
+
+    /**
+     * Removes all LogObservers from this Repository
+     */
+    void removeAllObservers();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
new file mode 100644
index 0000000..76ca661
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class LogRepositoryFactory {
+
+    public static final String LOG_REPOSITORY_CLASS_NAME = "org.apache.nifi.logging.repository.StandardLogRepository";
+
+    private static final ConcurrentMap<String, LogRepository> repositoryMap = new ConcurrentHashMap<>();
+    private static final Class<LogRepository> logRepositoryClass;
+
+    static {
+        Class<LogRepository> clazz = null;
+        try {
+            clazz = (Class<LogRepository>) Class.forName(LOG_REPOSITORY_CLASS_NAME, true, LogRepositoryFactory.class.getClassLoader());
+        } catch (ClassNotFoundException e) {
+            LoggerFactory.getLogger(LogRepositoryFactory.class).error("Unable to find class {}; logging may not work properly", LOG_REPOSITORY_CLASS_NAME);
+        }
+        logRepositoryClass = clazz;
+    }
+
+    public static LogRepository getRepository(final String processorId) {
+        LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+        if (repository == null) {
+            try {
+                repository = logRepositoryClass.newInstance();
+            } catch (final Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+            if (oldRepository != null) {
+                repository = oldRepository;
+            }
+        }
+
+        return repository;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
new file mode 100644
index 0000000..b25c90b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.Closeable;
+
+/**
+ *
+ */
+public class NarCloseable implements Closeable {
+
+    public static NarCloseable withNarLoader() {
+        final ClassLoader current = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+        return new NarCloseable(current);
+    }
+
+    private final ClassLoader toSet;
+
+    private NarCloseable(final ClassLoader toSet) {
+        this.toSet = toSet;
+    }
+
+    @Override
+    public void close() {
+        if (toSet != null) {
+            Thread.currentThread().setContextClassLoader(toSet);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
new file mode 100644
index 0000000..aa905a8
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.ReportingTask;
+
+/**
+ *
+ * @author none THREAD SAFE
+ */
+public class NarThreadContextClassLoader extends URLClassLoader {
+
+    static final ContextSecurityManager contextSecurityManager = new ContextSecurityManager();
+    private final ClassLoader forward = ClassLoader.getSystemClassLoader();
+    private static final List<Class<?>> narSpecificClasses = new ArrayList<>();
+
+    static {
+        narSpecificClasses.add(Processor.class);
+        narSpecificClasses.add(FlowFilePrioritizer.class);
+        narSpecificClasses.add(ReportingTask.class);
+        narSpecificClasses.add(Validator.class);
+        narSpecificClasses.add(InputStreamCallback.class);
+        narSpecificClasses.add(OutputStreamCallback.class);
+        narSpecificClasses.add(StreamCallback.class);
+        narSpecificClasses.add(ControllerService.class);
+        narSpecificClasses.add(AuthorityProvider.class);
+        narSpecificClasses.add(ProvenanceEventRepository.class);
+        narSpecificClasses.add(ComponentStatusRepository.class);
+        narSpecificClasses.add(FlowFileRepository.class);
+        narSpecificClasses.add(FlowFileSwapManager.class);
+        narSpecificClasses.add(ContentRepository.class);
+    }
+
+    private NarThreadContextClassLoader() {
+        super(new URL[0]);
+    }
+
+    @Override
+    public void clearAssertionStatus() {
+        lookupClassLoader().clearAssertionStatus();
+    }
+
+    @Override
+    public URL getResource(String name) {
+        return lookupClassLoader().getResource(name);
+    }
+
+    @Override
+    public InputStream getResourceAsStream(String name) {
+        return lookupClassLoader().getResourceAsStream(name);
+    }
+
+    @Override
+    public Enumeration<URL> getResources(String name) throws IOException {
+        return lookupClassLoader().getResources(name);
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException {
+        return lookupClassLoader().loadClass(name);
+    }
+
+    @Override
+    public void setClassAssertionStatus(String className, boolean enabled) {
+        lookupClassLoader().setClassAssertionStatus(className, enabled);
+    }
+
+    @Override
+    public void setDefaultAssertionStatus(boolean enabled) {
+        lookupClassLoader().setDefaultAssertionStatus(enabled);
+    }
+
+    @Override
+    public void setPackageAssertionStatus(String packageName, boolean enabled) {
+        lookupClassLoader().setPackageAssertionStatus(packageName, enabled);
+    }
+
+    private ClassLoader lookupClassLoader() {
+        final Class<?>[] classStack = contextSecurityManager.getExecutionStack();
+
+        for (Class<?> currentClass : classStack) {
+            final Class<?> narClass = findNarClass(currentClass);
+            if (narClass != null) {
+                final ClassLoader desiredClassLoader = narClass.getClassLoader();
+
+                // When new Threads are created, the new Thread inherits the ClassLoaderContext of
+                // the caller. However, the call stack of that new Thread may not trace back to any NiFi-specific
+                // code. Therefore, the NarThreadContextClassLoader will be unable to find the appropriate NAR
+                // ClassLoader. As a result, we want to set the ContextClassLoader to the NAR ClassLoader that
+                // contains the class or resource that we are looking for.
+                // This locks the current Thread into the appropriate NAR ClassLoader Context. The framework will change
+                // the ContextClassLoader back to the NarThreadContextClassLoader as appropriate via the
+                // {@link FlowEngine.beforeExecute(Thread, Runnable)} and 
+                // {@link FlowEngine.afterExecute(Thread, Runnable)} methods.
+                if (desiredClassLoader instanceof NarClassLoader) {
+                    Thread.currentThread().setContextClassLoader(desiredClassLoader);
+                }
+                return desiredClassLoader;
+            }
+        }
+        return forward;
+    }
+
+    private Class<?> findNarClass(final Class<?> cls) {
+        for (final Class<?> narClass : narSpecificClasses) {
+            if (narClass.isAssignableFrom(cls)) {
+                return cls;
+            } else if (cls.getEnclosingClass() != null) {
+                return findNarClass(cls.getEnclosingClass());
+            }
+        }
+
+        return null;
+    }
+
+    private static class SingletonHolder {
+
+        public static final NarThreadContextClassLoader instance = new NarThreadContextClassLoader();
+    }
+
+    public static NarThreadContextClassLoader getInstance() {
+        return SingletonHolder.instance;
+    }
+
+    static class ContextSecurityManager extends SecurityManager {
+
+        Class<?>[] getExecutionStack() {
+            return getClassContext();
+        }
+    }
+
+    public static <T> T createInstance(final String implementationClassName, final Class<T> typeDefinition) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+        final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+        try {
+            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName);
+            final Class<?> rawClass;
+            if (detectedClassLoaderForType == null) {
+                // try to find from the current class loader
+                rawClass = Class.forName(implementationClassName);
+            } else {
+                // try to find from the registered classloader for that type
+                rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName));
+            }
+
+            Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+            final Class<?> desiredClass = rawClass.asSubclass(typeDefinition);
+            return typeDefinition.cast(desiredClass.newInstance());
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..2422fe1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public class Peer {
+
+    private final CommunicationsSession commsSession;
+    private final String url;
+    private final String host;
+    private long penalizationExpiration = 0L;
+    private boolean closed = false;
+
+    public Peer(final CommunicationsSession commsSession, final String url) {
+        this.commsSession = commsSession;
+        this.url = url;
+
+        try {
+            this.host = new URI(url).getHost();
+        } catch (final Exception e) {
+            throw new IllegalArgumentException("Invalid URL: " + url);
+        }
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public CommunicationsSession getCommunicationsSession() {
+        return commsSession;
+    }
+
+    public void close() throws IOException {
+        this.closed = true;
+
+        // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+        commsSession.close();
+    }
+
+    public void penalize(final long millis) {
+        penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+    }
+
+    public boolean isPenalized() {
+        return penalizationExpiration > System.currentTimeMillis();
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    @Override
+    public int hashCode() {
+        return 8320 + url.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof Peer)) {
+            return false;
+        }
+
+        final Peer other = (Peer) obj;
+        return this.url.equals(other.url);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Peer[url=").append(url);
+        if (closed) {
+            sb.append(",CLOSED");
+        } else if (isPenalized()) {
+            sb.append(",PENALIZED");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..d1cb076
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+    private final String hostname;
+    private final int port;
+    private final boolean secure;
+    private final int numFlowFiles;
+
+    public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
+        this.hostname = hostname;
+        this.port = port;
+        this.secure = secure;
+        this.numFlowFiles = numFlowFiles;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public boolean isSecure() {
+        return secure;
+    }
+
+    public int getFlowFileCount() {
+        return numFlowFiles;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return 9824372 + hostname.hashCode() + port;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof PeerStatus)) {
+            return false;
+        }
+
+        final PeerStatus other = (PeerStatus) obj;
+        return port == other.port && hostname.equals(other.hostname);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
new file mode 100644
index 0000000..8f2603a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface PortAuthorizationResult {
+
+    boolean isAuthorized();
+
+    String getExplanation();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
new file mode 100644
index 0000000..12a3d33
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ *
+ */
+public enum RemoteAuthorizationState {
+
+    UNKNOWN,
+    UNAUTHORIZED,
+    AUTHORIZED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
new file mode 100644
index 0000000..d4ad374
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.groups.RemoteProcessGroup;
+
+public interface RemoteGroupPort extends Port {
+
+    RemoteProcessGroup getRemoteProcessGroup();
+
+    TransferDirection getTransferDirection();
+
+    boolean isUseCompression();
+
+    void setUseCompression(boolean useCompression);
+
+    boolean getTargetExists();
+
+    boolean isTargetRunning();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
new file mode 100644
index 0000000..4afdfb7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.protocol.ServerProtocol;
+
+public interface RootGroupPort extends Port {
+
+    boolean isTransmitting();
+
+    void setGroupAccessControl(Set<String> groups);
+
+    Set<String> getGroupAccessControl();
+
+    void setUserAccessControl(Set<String> users);
+
+    Set<String> getUserAccessControl();
+
+    /**
+     * Verifies that the specified user is authorized to interact with this port
+     * and returns a {@link PortAuthorizationResult} indicating why the user is
+     * unauthorized if this assumption fails
+     *
+     * @param dn
+     * @return
+     */
+    PortAuthorizationResult checkUserAuthorization(String dn);
+
+    /**
+     * Receives data from the given stream
+     *
+     * @param peer
+     * @param serverProtocol
+     * @param requestHeaders
+     *
+     * @return the number of FlowFiles received
+     * @throws org.apache.nifi.remote.exception.NotAuthorizedException
+     * @throws org.apache.nifi.remote.exception.BadRequestException
+     * @throws org.apache.nifi.remote.exception.RequestExpiredException
+     */
+    int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
+
+    /**
+     * Transfers data to the given stream
+     *
+     * @param peer
+     * @param requestHeaders
+     * @param serverProtocol
+     *
+     * @return the number of FlowFiles transferred
+     * @throws org.apache.nifi.remote.exception.NotAuthorizedException
+     * @throws org.apache.nifi.remote.exception.BadRequestException
+     * @throws org.apache.nifi.remote.exception.RequestExpiredException
+     */
+    int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
+
+}