You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:30 UTC
[07/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
new file mode 100644
index 0000000..e434905
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+public interface RepositoryStatusReport {
+
+ void addReportEntry(FlowFileEvent entry);
+
+ Map<String, FlowFileEvent> getReportEntries();
+
+ FlowFileEvent getReportEntry(String componentId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
new file mode 100644
index 0000000..6f9c237
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import org.apache.nifi.controller.Availability;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ControllerService;
+
+public interface ControllerServiceNode extends ConfiguredComponent {
+
+ ControllerService getControllerService();
+
+ Availability getAvailability();
+
+ void setAvailability(Availability availability);
+
+ boolean isDisabled();
+
+ void setDisabled(boolean disabled);
+
+ ControllerServiceReference getReferences();
+
+ void addReference(ConfiguredComponent referringComponent);
+
+ void removeReference(ConfiguredComponent referringComponent);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
new file mode 100644
index 0000000..35a255d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerServiceLookup;
+
+/**
+ *
+ */
+public interface ControllerServiceProvider extends ControllerServiceLookup {
+
+ /**
+ * Gets the controller service for the specified identifier. Returns null if
+ * the identifier does not match a known service.
+ *
+ * @param type
+ * @param id
+ * @param properties
+ * @return
+ */
+ ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties);
+
+ /**
+ * Gets the controller service node for the specified identifier. Returns
+ * <code>null</code> if the identifier does not match a known service
+ *
+ * @param id
+ * @return
+ */
+ ControllerServiceNode getControllerServiceNode(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
new file mode 100644
index 0000000..5cb676f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import java.util.Set;
+
+import org.apache.nifi.controller.ConfiguredComponent;
+
+/**
+ * Provides a collection of components that are referencing a Controller Service
+ */
+public interface ControllerServiceReference {
+
+ /**
+ * Returns the component that is being referenced
+ *
+ * @return
+ */
+ ControllerServiceNode getReferencedComponent();
+
+ /**
+ * Returns a {@link Set} of all components that are referencing this
+ * Controller Service
+ *
+ * @return
+ */
+ Set<ConfiguredComponent> getReferencingComponents();
+
+ /**
+ * Returns a {@link Set} of all Processors and Reporting Tasks that are
+ * referencing the Controller Service and are running, in addition to all
+ *
+ * @return
+ */
+ Set<ConfiguredComponent> getRunningReferences();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
new file mode 100644
index 0000000..d1d5e5b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public final class BulletinFactory {
+
+ private static final AtomicLong currentId = new AtomicLong(0);
+
+ public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
+ return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+ }
+
+ public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+ final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
+ bulletin.setGroupId(groupId);
+ bulletin.setSourceId(sourceId);
+ bulletin.setSourceName(sourceName);
+ bulletin.setCategory(category);
+ bulletin.setLevel(severity);
+ bulletin.setMessage(message);
+ return bulletin;
+ }
+
+ public static Bulletin createBulletin(final String category, final String severity, final String message) {
+ final Bulletin bulletin = new SystemBulletin(currentId.getAndIncrement());
+ bulletin.setCategory(category);
+ bulletin.setLevel(severity);
+ bulletin.setMessage(message);
+ return bulletin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
new file mode 100644
index 0000000..9846cf2
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public interface BulletinProcessingStrategy {
+
+ void update(Bulletin bulletin);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
new file mode 100644
index 0000000..23c4cdb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public class ComponentBulletin extends Bulletin {
+
+ ComponentBulletin(final long id) {
+ super(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
new file mode 100644
index 0000000..f97dc46
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ *
+ */
+public class SystemBulletin extends Bulletin {
+
+ SystemBulletin(final long id) {
+ super(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
new file mode 100644
index 0000000..61be59c
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Processor;
+
+/**
+ * <p>
+ * ProcessGroup objects are containers for processing entities, such as
+ * {@link Processor}s, {@link Port}s, and other {@link ProcessGroup}s.
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE</p>
+ */
+public interface ProcessGroup {
+
+ /**
+ * @return a reference to this ProcessGroup's parent. This will be
+ * <tt>null</tt> if and only if this is the root group.
+ */
+ ProcessGroup getParent();
+
+ /**
+ * Updates the ProcessGroup to point to a new parent
+ *
+ * @param group
+ */
+ void setParent(ProcessGroup group);
+
+ /**
+ * @return the ID of the ProcessGroup
+ */
+ String getIdentifier();
+
+ /**
+ * @return the name of the ProcessGroup
+ */
+ String getName();
+
+ /**
+ * Updates the name of this ProcessGroup.
+ *
+ * @param name
+ */
+ void setName(String name);
+
+ /**
+ * Updates the position of where this ProcessGroup is located in the graph
+ */
+ void setPosition(Position position);
+
+ /**
+ * Returns the position of where this ProcessGroup is located in the graph
+ *
+ * @return
+ */
+ Position getPosition();
+
+ /**
+ * @return the user-set comments about this ProcessGroup, or
+ * <code>null</code> if no comments have been set
+ */
+ String getComments();
+
+ /**
+ * Updates the comments for this ProcessGroup
+ *
+ * @param comments
+ */
+ void setComments(String comments);
+
+ /**
+ * Returns the counts for this ProcessGroup
+ *
+ * @return
+ */
+ ProcessGroupCounts getCounts();
+
+ /**
+ * Starts all Processors, Local Ports, and Funnels that are directly within
+ * this group and any child ProcessGroups, except for those that are
+ * disabled.
+ */
+ void startProcessing();
+
+ /**
+ * Stops all Processors, Local Ports, and Funnels that are directly within
+ * this group and child ProcessGroups, except for those that are disabled.
+ */
+ void stopProcessing();
+
+ /**
+ * Starts the given Processor
+ *
+ * @param processor the processor to start
+ * @throws IllegalStateException if the processor is not valid, or is
+ * already running
+ */
+ void enableProcessor(ProcessorNode processor);
+
+ /**
+ * Starts the given Input Port
+ *
+ * @param port
+ */
+ void enableInputPort(Port port);
+
+ /**
+ * Starts the given Output Port
+ *
+ * @param port
+ */
+ void enableOutputPort(Port port);
+
+ /**
+ * Starts the given Funnel
+ *
+ * @param funnel
+ */
+ void enableFunnel(Funnel funnel);
+
+ /**
+ * Starts the given Processor
+ *
+ * @param processor the processor to start
+ * @throws IllegalStateException if the processor is not valid, or is
+ * already running
+ */
+ void startProcessor(ProcessorNode processor);
+
+ /**
+ * Starts the given Input Port
+ *
+ * @param port
+ */
+ void startInputPort(Port port);
+
+ /**
+ * Starts the given Output Port
+ *
+ * @param port
+ */
+ void startOutputPort(Port port);
+
+ /**
+ * Starts the given Funnel
+ *
+ * @param funnel
+ */
+ void startFunnel(Funnel funnel);
+
+ /**
+ * Stops the given Processor
+ *
+ * @param processor
+ */
+ void stopProcessor(ProcessorNode processor);
+
+ /**
+ * Stops the given Port
+ *
+ * @param processor
+ */
+ void stopInputPort(Port port);
+
+ /**
+ * Stops the given Port
+ *
+ * @param processor
+ */
+ void stopOutputPort(Port port);
+
+ /**
+ * Stops the given Funnel
+ *
+ * @param processor
+ */
+ void stopFunnel(Funnel funnel);
+
+ /**
+ * Starts the given Processor
+ *
+ * @param processor the processor to start
+ * @throws IllegalStateException if the processor is not valid, or is
+ * already running
+ */
+ void disableProcessor(ProcessorNode processor);
+
+ /**
+ * Starts the given Input Port
+ *
+ * @param port
+ */
+ void disableInputPort(Port port);
+
+ /**
+ * Starts the given Output Port
+ *
+ * @param port
+ */
+ void disableOutputPort(Port port);
+
+ /**
+ * Starts the given Funnel
+ *
+ * @param funnel
+ */
+ void disableFunnel(Funnel funnel);
+
+ /**
+ * Indicates that the Flow is being shutdown; allows cleanup of resources
+ * associated with processors, etc.
+ */
+ void shutdown();
+
+ /**
+ * Returns a boolean indicating whether or not this ProcessGroup is the root
+ * group
+ *
+ * @return
+ */
+ boolean isRootGroup();
+
+ /**
+ * Adds a {@link Port} to be used for transferring {@link FlowFile}s from
+ * external sources to {@link Processor}s and other {@link Port}s within
+ * this ProcessGroup.
+ *
+ * @param port
+ */
+ void addInputPort(Port port);
+
+ /**
+ * Removes a {@link Port} from this ProcessGroup's list of Input Ports.
+ *
+ * @param port the Port to remove
+ * @throws NullPointerException if <code>port</code> is null
+ * @throws IllegalStateException if port is not an Input Port for this
+ * ProcessGroup
+ */
+ void removeInputPort(Port port);
+
+ /**
+ * @return the {@link Set} of all {@link Port}s that are used by this
+ * ProcessGroup as Input Ports.
+ */
+ Set<Port> getInputPorts();
+
+ /**
+ * @param id the ID of the input port
+ * @return the input port with the given ID, or <code>null</code> if it does
+ * not exist.
+ */
+ Port getInputPort(String id);
+
+ /**
+ * Adds a {@link Port} to be used for transferring {@link FlowFile}s to
+ * external sources.
+ *
+ * @param port the Port to add
+ */
+ void addOutputPort(Port port);
+
+ /**
+ * Removes a {@link Port} from this ProcessGroup's list of Output Ports.
+ *
+ * @param port the Port to remove
+ * @throws NullPointerException if <code>port</code> is null
+ * @throws IllegalStateException if port is not an Input Port for this
+ * ProcessGroup
+ */
+ void removeOutputPort(Port port);
+
+ /**
+ * @param id the ID of the output port
+ * @return the output port with the given ID, or <code>null</code> if it
+ * does not exist.
+ */
+ Port getOutputPort(String id);
+
+ /**
+ * @return the {@link Set} of all {@link Port}s that are used by this
+ * ProcessGroup as Output Ports.
+ */
+ Set<Port> getOutputPorts();
+
+ /**
+ * Adds a reference to a ProgressGroup as a child of this.
+ *
+ * @return the newly created reference
+ */
+ void addProcessGroup(ProcessGroup group);
+
+ /**
+ * Returns the ProcessGroup whose parent is <code>this</code> and whose id
+ * is given
+ *
+ * @param id
+ * @return
+ */
+ ProcessGroup getProcessGroup(String id);
+
+ /**
+ * @return a {@link Set} of all Process Group References that are contained
+ * within this.
+ */
+ Set<ProcessGroup> getProcessGroups();
+
+ /**
+ * @param group the group to remove
+ * @throws NullPointerException if <code>group</code> is null
+ * @throws IllegalStateException if group is not member of this
+ * ProcessGroup, or the given ProcessGroup is not empty (i.e., it contains
+ * at least one Processor, ProcessGroup, Input Port, Output Port, or Label).
+ */
+ void removeProcessGroup(ProcessGroup group);
+
+ /**
+ * Adds the already constructed processor instance to this group
+ *
+ * @param processor the processor to add
+ */
+ void addProcessor(ProcessorNode processor);
+
+ /**
+ * Removes the given processor from this group, destroying the Processor.
+ * The Processor is removed from the ProcessorRegistry, and any method in
+ * the Processor that is annotated with the
+ * {@link nifi.processor.annotation.OnRemoved OnRemoved} annotation will be
+ * invoked. All outgoing connections will also be destroyed
+ *
+ * @param processor the Processor to remove
+ * @throws NullPointerException if <code>processor</code> is null
+ * @throws IllegalStateException if <code>processor</code> is not a member
+ * of this ProcessGroup, is currently running, or has any incoming
+ * connections.
+ */
+ void removeProcessor(ProcessorNode processor);
+
+ /**
+ * @return a {@link Collection} of all FlowFileProcessors that are contained
+ * within this.
+ */
+ Set<ProcessorNode> getProcessors();
+
+ /**
+ * Returns the FlowFileProcessor with the given ID.
+ *
+ * @param id the ID of the processor to retrieve
+ * @return the processor with the given ID
+ * @throws NullPointerException if <code>id</code> is null.
+ */
+ ProcessorNode getProcessor(String id);
+
+ /**
+ * Returns the <code>Connectable</code> with the given ID, or
+ * <code>null</code> if the <code>Connectable</code> is not a member of the
+ * group
+ *
+ * @param id the ID of the Connectable
+ * @return
+ */
+ Connectable getConnectable(String id);
+
+ /**
+ * Adds the given connection to this ProcessGroup. This method also notifies
+ * the Source and Destination of the Connection that the Connection has been
+ * established.
+ *
+ * @param connection
+ * @throws NullPointerException if the connection is null
+ * @throws IllegalStateException if the source or destination of the
+ * connection is not a member of this ProcessGroup or if a connection
+ * already exists in this ProcessGroup with the same ID
+ */
+ void addConnection(Connection connection);
+
+ /**
+ * Removes the connection from this ProcessGroup.
+ *
+ * @param connection
+ * @throws IllegalStateException if <code>connection</code> is not contained
+ * within this.
+ */
+ void removeConnection(Connection connection);
+
+ /**
+ * Inherits a Connection from another ProcessGroup; this does not perform
+ * any validation but simply notifies the ProcessGroup that it is now the
+ * owner of the given Connection. This is used in place of the
+ * {@link #addConnection(Connection)} method when moving Connections from
+ * one group to another because addConnection notifies both the Source and
+ * Destination of the Connection that the Connection has been established;
+ * this method does not notify either, as both the Source and Destination
+ * should already be aware of the Connection.
+ *
+ * @param connection
+ */
+ void inheritConnection(Connection connection);
+
+ /**
+ * @return the Connection with the given ID, or <code>null</code> if the
+ * connection does not exist.
+ */
+ Connection getConnection(String id);
+
+ /**
+ * Returns the {@link Set} of all {@link Connection}s contained within this.
+ *
+ * @return
+ */
+ Set<Connection> getConnections();
+
+ /**
+ * Returns a List of all Connections contains within this ProcessGroup and
+ * any child ProcessGroups.
+ *
+ * @return
+ */
+ List<Connection> findAllConnections();
+
+ /**
+ * Adds the given RemoteProcessGroup to this ProcessGroup
+ *
+ * @param remoteGroup
+ *
+ * @throws NullPointerException if the given argument is null
+ */
+ void addRemoteProcessGroup(RemoteProcessGroup remoteGroup);
+
+ /**
+ * Removes the given RemoteProcessGroup from this ProcessGroup
+ *
+ * @param remoteGroup
+ * @throws NullPointerException if the argument is null
+ * @throws IllegalStateException if the given argument does not belong to
+ * this ProcessGroup
+ */
+ void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup);
+
+ /**
+ * Returns the RemoteProcessGroup that is the child of this ProcessGroup and
+ * has the given ID. If no RemoteProcessGroup can be found with the given
+ * ID, returns <code>null</code>.
+ *
+ * @param id
+ * @return
+ */
+ RemoteProcessGroup getRemoteProcessGroup(String id);
+
+ /**
+ * Returns a set of all RemoteProcessGroups that belong to this
+ * ProcessGroup. If no RemoteProcessGroup's have been added to this
+ * ProcessGroup, will return an empty Set.
+ *
+ * @return
+ */
+ Set<RemoteProcessGroup> getRemoteProcessGroups();
+
+ /**
+ * Adds the given Label to this ProcessGroup
+ *
+ * @param label the label to add
+ * @return
+ *
+ * @throws NullPointerException if the argument is null
+ */
+ void addLabel(Label label);
+
+ /**
+ * Removes the given Label from this ProcessGroup
+ *
+ * @param label the label to remove
+ * @throws NullPointerException if the argument is null
+ * @throws IllegalStateException if the given argument does not belong to
+ * this ProcessGroup
+ */
+ void removeLabel(Label label);
+
+ /**
+ * Returns a set of all Labels that belong to this ProcessGroup. If no
+ * Labels belong to this ProcessGroup, returns an empty Set.
+ *
+ * @return
+ */
+ Set<Label> getLabels();
+
+ /**
+ * Returns the Label that belongs to this ProcessGroup and has the given id.
+ * If no Label can be found with this ID, returns <code>null</code>.
+ *
+ * @param id
+ * @return
+ */
+ Label getLabel(String id);
+
+ /**
+ * Returns the Process Group with the given ID, if it exists as a child of
+ * this ProcessGroup, or is this ProcessGroup. This performs a recursive
+ * search of all ProcessGroups and descendant ProcessGroups
+ *
+ * @param id
+ * @return
+ */
+ ProcessGroup findProcessGroup(String id);
+
+ /**
+ * Returns the RemoteProcessGroup with the given ID, if it exists as a child
+ * or descendant of this ProcessGroup. This performs a recursive search of
+ * all ProcessGroups and descendant ProcessGroups
+ *
+ * @param id
+ * @return
+ */
+ RemoteProcessGroup findRemoteProcessGroup(String id);
+
+ /**
+ * Returns a List of all Remote Process Groups that are children or
+ * descendants of this ProcessGroup. This performs a recursive search of all
+ * descendant ProcessGroups
+ *
+ * @return
+ */
+ List<RemoteProcessGroup> findAllRemoteProcessGroups();
+
+ /**
+ * Returns the Processor with the given ID, if it exists as a child or
+ * descendant of this ProcessGroup. This performs a recursive search of all
+ * descendant ProcessGroups
+ *
+ * @param id
+ * @return
+ */
+ ProcessorNode findProcessor(String id);
+
+ /**
+ * Returns a List of all Processors that are children or descendants of this
+ * ProcessGroup. This performs a recursive search of all descendant
+ * ProcessGroups
+ *
+ * @return
+ */
+ List<ProcessorNode> findAllProcessors();
+
+ /**
+ * Returns a List of all Labels that are children or descendants of this
+ * ProcessGroup. This performsn a recursive search of all descendant
+ * ProcessGroups
+ *
+ * @return
+ */
+ List<Label> findAllLabels();
+
+ /**
+ * Returns the input port with the given ID, if it exists; otherwise returns
+ * null. This performs a recursive search of all Input Ports and descendant
+ * ProcessGroups
+ *
+ * @param id
+ * @return
+ */
+ Port findInputPort(String id);
+
+ /**
+ * Returns the input port with the given name, if it exists; otherwise
+ * returns null. ProcessGroups
+ *
+ * @param name
+ * @return
+ */
+ Port getInputPortByName(String name);
+
+ /**
+ * Returns the output port with the given ID, if it exists; otherwise
+ * returns null. This performs a recursive search of all Output Ports and
+ * descendant ProcessGroups
+ *
+ * @param id
+ * @return
+ */
+ Port findOutputPort(String id);
+
+ /**
+ * Returns the output port with the given name, if it exists; otherwise
+ * returns null.
+ *
+ * @param name
+ * @return
+ */
+ Port getOutputPortByName(String name);
+
+ /**
+ * Adds the given funnel to this ProcessGroup
+ *
+ * @param funnel
+ */
+ void addFunnel(Funnel funnel);
+
+ /**
+ * Returns a Set of all Funnels that belong to this ProcessGroup
+ *
+ * @return
+ */
+ Set<Funnel> getFunnels();
+
+ /**
+ * Returns the funnel with the given identifier
+ *
+ * @param id
+ * @return
+ */
+ Funnel getFunnel(String id);
+
+ /**
+ * Removes the given funnel from this ProcessGroup
+ *
+ * @param funnel
+ *
+ * @throws IllegalStateException if the funnel is not a member of this
+ * ProcessGroup or has incoming or outgoing connections
+ */
+ void removeFunnel(Funnel funnel);
+
+ /**
+ * @return <code>true</code> if this ProcessGroup has no Processors, Labels,
+ * Connections, ProcessGroups, RemoteProcessGroupReferences, or Ports.
+ * Otherwise, returns <code>false</code>.
+ */
+ boolean isEmpty();
+
+ /**
+ * Removes all of the components whose ID's are specified within the given
+ * {@link Snippet} from this ProcessGroup.
+ *
+ * @param snippet
+ *
+ * @throws NullPointerException if argument is null
+ * @throws IllegalStateException if any ID in the snippet refers to a
+ * component that is not within this ProcessGroup
+ */
+ void remove(final Snippet snippet);
+
+ /**
+ * Returns the Connectable with the given ID, if it exists; otherwise
+ * returns null. This performs a recursive search of all ProcessGroups'
+ * input ports, output ports, funnels, processors, and remote process groups
+ *
+ * @param identifier
+ * @return
+ */
+ Connectable findConnectable(String identifier);
+
+ /**
+ * Moves all of the components whose ID's are specified within the given
+ * {@link Snippet} from this ProcessGroup into the given destination
+ * ProcessGroup
+ *
+ * @param snippet
+ * @param destination
+ *
+ * @throws NullPointerExcepiton if either argument is null
+ * @throws IllegalStateException if any ID in the snippet refers to a
+ * component that is not within this ProcessGroup
+ */
+ void move(final Snippet snippet, final ProcessGroup destination);
+
+ void verifyCanDelete();
+
+ void verifyCanStart();
+
+ void verifyCanStop();
+
+ /**
+ * Ensures that deleting the given snippet is a valid operation at this
+ * point in time, depending on the state of this ProcessGroup
+ *
+ * @param snippet
+ *
+ * @throws IllegalStateException if deleting the Snippet is not valid at
+ * this time
+ */
+ void verifyCanDelete(Snippet snippet);
+
+ /**
+ * Ensure that moving the given snippet to the given new group is a valid
+ * operation at this point in time, depending on the state of both
+ * ProcessGroups
+ *
+ * @param snippet
+ * @param newProcessGroup
+ *
+ * @throws IllegalStateException if the move is not valid at this time
+ */
+ void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
new file mode 100644
index 0000000..3eb594b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+public class ProcessGroupCounts {
+
+ private final int inputPortCount, outputPortCount, runningCount, stoppedCount, invalidCount, disabledCount, activeRemotePortCount, inactiveRemotePortCount;
+
+ public ProcessGroupCounts(final int inputPortCount, final int outputPortCount, final int runningCount,
+ final int stoppedCount, final int invalidCount, final int disabledCount, final int activeRemotePortCount, final int inactiveRemotePortCount) {
+ this.inputPortCount = inputPortCount;
+ this.outputPortCount = outputPortCount;
+ this.runningCount = runningCount;
+ this.stoppedCount = stoppedCount;
+ this.invalidCount = invalidCount;
+ this.disabledCount = disabledCount;
+ this.activeRemotePortCount = activeRemotePortCount;
+ this.inactiveRemotePortCount = inactiveRemotePortCount;
+ }
+
+ public int getInputPortCount() {
+ return inputPortCount;
+ }
+
+ public int getOutputPortCount() {
+ return outputPortCount;
+ }
+
+ public int getRunningCount() {
+ return runningCount;
+ }
+
+ public int getStoppedCount() {
+ return stoppedCount;
+ }
+
+ public int getInvalidCount() {
+ return invalidCount;
+ }
+
+ public int getDisabledCount() {
+ return disabledCount;
+ }
+
+ public int getActiveRemotePortCount() {
+ return activeRemotePortCount;
+ }
+
+ public int getInactiveRemotePortCount() {
+ return inactiveRemotePortCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
new file mode 100644
index 0000000..3acd1d3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.exception.CommunicationsException;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public interface RemoteProcessGroup {
+
+ String getIdentifier();
+
+ URI getTargetUri();
+
+ ProcessGroup getProcessGroup();
+
+ void setProcessGroup(ProcessGroup group);
+
+ void setPosition(Position position);
+
+ Position getPosition();
+
+ String getComments();
+
+ void setComments(String comments);
+
+ /**
+ * Returns the name of this RemoteProcessGroup. The value returned will
+ * never be null. If unable to communicate with the remote instance, the URI
+ * of that instance may be returned instead
+ *
+ * @return
+ */
+ String getName();
+
+ void setName(String name);
+
+ void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+
+ void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
+
+ Set<RemoteGroupPort> getInputPorts();
+
+ Set<RemoteGroupPort> getOutputPorts();
+
+ RemoteGroupPort getInputPort(String id);
+
+ RemoteGroupPort getOutputPort(String id);
+
+ ProcessGroupCounts getCounts();
+
+ void refreshFlowContents() throws CommunicationsException;
+
+ Date getLastRefreshTime();
+
+ void setYieldDuration(final String yieldDuration);
+
+ String getYieldDuration();
+
+ /**
+ * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
+ *
+ * @param timePeriod
+ * @throws IllegalArgumentException
+ */
+ void setCommunicationsTimeout(String timePeriod) throws IllegalArgumentException;
+
+ /**
+ * Returns the communications timeout in terms of the given TimeUnit
+ *
+ * @param timeUnit
+ * @return
+ */
+ int getCommunicationsTimeout(TimeUnit timeUnit);
+
+ /**
+ * Returns the user-configured String representation of the communications
+ * timeout
+ *
+ * @return
+ */
+ String getCommunicationsTimeout();
+
+ /**
+ * @return the port that the remote instance is listening on for
+ * site-to-site communication, or <code>null</code> if the remote instance
+ * is not configured to allow site-to-site communications.
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ Integer getListeningPort() throws IOException;
+
+ /**
+ * Indicates whether or not the RemoteProcessGroup is currently scheduled to
+ * transmit data
+ *
+ * @return
+ */
+ boolean isTransmitting();
+
+ /**
+ * Initiates communications between this instance and the remote instance.
+ */
+ void startTransmitting();
+
+ /**
+ * Immediately terminates communications between this instance and the
+ * remote instance.
+ */
+ void stopTransmitting();
+
+ /**
+ * Initiates communications between this instance and the remote instance
+ * only for the port specified.
+ *
+ * @param port
+ */
+ void startTransmitting(RemoteGroupPort port);
+
+ /**
+ * Immediately terminates communications between this instance and the
+ * remote instance only for the port specified.
+ *
+ * @param port
+ */
+ void stopTransmitting(RemoteGroupPort port);
+
+ /**
+ * Indicates whether or not communications with this RemoteProcessGroup will
+ * be secure (2-way authentication)
+ *
+ * @return
+ */
+ boolean isSecure() throws CommunicationsException;
+
+ /**
+ * Indicates whether or not communications with this RemoteProcessGroup will
+ * be secure (2-way authentication). Returns null if unknown.
+ *
+ * @return
+ */
+ Boolean getSecureFlag();
+
+ /**
+ * Returns true if the target system has site to site enabled. Returns false
+ * otherwise (they don't or they have not yet responded).
+ *
+ * @return
+ */
+ boolean isSiteToSiteEnabled();
+
+ /**
+ * Returns a String indicating why we are not authorized to communicate with
+ * the remote instance, or <code>null</code> if we are authorized
+ *
+ * @return
+ */
+ String getAuthorizationIssue();
+
+ /**
+ * Returns the {@link EventReporter} that can be used to report any notable
+ * events
+ *
+ * @return
+ */
+ EventReporter getEventReporter();
+
+ /**
+ * Initiates a task in the remote process group to re-initialize, as a
+ * result of clustering changes
+ *
+ * @param isClustered whether or not this instance is now clustered
+ */
+ void reinitialize(boolean isClustered);
+
+ /**
+ * Removes all non existent ports from this RemoteProcessGroup.
+ */
+ void removeAllNonExistentPorts();
+
+ /**
+ * Removes a port that no longer exists on the remote instance from this
+ * RemoteProcessGroup
+ *
+ * @param port
+ */
+ void removeNonExistentPort(final RemoteGroupPort port);
+
+ /**
+ *
+ * @return @throws IOException
+ */
+ CommunicationsSession establishSiteToSiteConnection() throws IOException;
+
+ /**
+ * Called whenever RemoteProcessGroup is removed from the flow, so that any
+ * resources can be cleaned up appropriately.
+ */
+ void onRemove();
+
+ void verifyCanDelete();
+
+ void verifyCanDelete(boolean ignoreConnections);
+
+ void verifyCanStartTransmitting();
+
+ void verifyCanStopTransmitting();
+
+ void verifyCanUpdate();
+
+ /**
+ * Returns a set of PeerStatus objects that describe the different peers
+ * that we can communicate with for this RemoteProcessGroup.
+ *
+ * If the destination is a cluster, this set will contain PeerStatuses for
+ * each of the nodes in the cluster.
+ *
+ * If the destination is a standalone instance, this set will contain just a
+ * PeerStatus for the destination.
+ *
+ * Once the PeerStatuses have been obtained, they may be cached by this
+ * RemoteProcessGroup for some amount of time.
+ *
+ * If unable to obtain the PeerStatuses or no peer status has yet been
+ * obtained, will return null.
+ *
+ * @return
+ */
+ Set<PeerStatus> getPeerStatuses();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
new file mode 100644
index 0000000..fb4f6e0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+public interface RemoteProcessGroupPortDescriptor {
+
+ /**
+ * The comments as configured in the target port.
+ *
+ * @return
+ */
+ String getComments();
+
+ /**
+ * The number tasks that may transmit flow files to the target port
+ * concurrently.
+ *
+ * @return
+ */
+ Integer getConcurrentlySchedulableTaskCount();
+
+ /**
+ * The id of the target port.
+ *
+ * @return
+ */
+ String getId();
+
+ /**
+ * The id of the remote process group that this port resides in.
+ *
+ * @return
+ */
+ String getGroupId();
+
+ /**
+ * The name of the target port.
+ *
+ * @return
+ */
+ String getName();
+
+ /**
+ * Whether or not this remote group port is configured for transmission.
+ *
+ * @return
+ */
+ Boolean isTransmitting();
+
+ /**
+ * Whether or not flow file are compressed when sent to this target port.
+ *
+ * @return
+ */
+ Boolean getUseCompression();
+
+ /**
+ * Whether ot not the target port exists.
+ *
+ * @return
+ */
+ Boolean getExists();
+
+ /**
+ * Whether or not the target port is running.
+ *
+ * @return
+ */
+ Boolean isTargetRunning();
+
+ /**
+ * Whether or not this port has either an incoming or outgoing connection.
+ *
+ * @return
+ */
+ Boolean isConnected();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
new file mode 100644
index 0000000..27cc6c5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Date;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+public class LogMessage {
+
+ private final String message;
+ private final LogLevel level;
+ private final Throwable throwable;
+ private final long time;
+
+ public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final String TO_STRING_FORMAT = "%1$s %2$s - %3$s";
+
+ public LogMessage(final long millisSinceEpoch, final LogLevel level, final String message, final Throwable throwable) {
+ this.level = level;
+ this.throwable = throwable;
+ this.message = message;
+ this.time = millisSinceEpoch;
+ }
+
+ public long getMillisSinceEpoch() {
+ return time;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public LogLevel getLevel() {
+ return level;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ @Override
+ public String toString() {
+ final DateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT);
+ final String formattedTime = dateFormat.format(new Date(time));
+
+ String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, level.toString(), message);
+ if (throwable != null) {
+ final StringWriter sw = new StringWriter();
+ final PrintWriter pw = new PrintWriter(sw);
+ throwable.printStackTrace(pw);
+ formattedMsg += "\n" + sw.toString();
+ }
+
+ return formattedMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
new file mode 100644
index 0000000..a75f8ea
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+public interface LogObserver {
+
+ void onLogMessage(LogMessage message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
new file mode 100644
index 0000000..4a017ce
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+public interface LogRepository {
+
+ void addLogMessage(LogLevel level, String message);
+
+ void addLogMessage(LogLevel level, String message, Throwable t);
+
+ void addLogMessage(LogLevel level, String messageFormat, Object[] params);
+
+ void addLogMessage(LogLevel level, String messageFormat, Object[] params, Throwable t);
+
+ /**
+ * Registers an observer so that it will be notified of all Log Messages
+ * whose levels are at least equal to the given level.
+ *
+ * @param observerIdentifier
+ * @param level
+ * @param observer
+ */
+ void addObserver(String observerIdentifier, LogLevel level, LogObserver observer);
+
+ /**
+ * Sets the observation level of the specified observer.
+ *
+ * @param observerIdentifier
+ * @param level
+ */
+ void setObservationLevel(String observerIdentifier, LogLevel level);
+
+ /**
+ * Gets the observation level for the specified observer.
+ *
+ * @param observerIdentifier
+ * @return
+ */
+ LogLevel getObservationLevel(String observerIdentifier);
+
+ /**
+ * Removes the given LogObserver from this Repository.
+ *
+ * @param observerIdentifier
+ * @return
+ */
+ LogObserver removeObserver(String observerIdentifier);
+
+ /**
+ * Removes all LogObservers from this Repository
+ */
+ void removeAllObservers();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
new file mode 100644
index 0000000..76ca661
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class LogRepositoryFactory {
+
+ public static final String LOG_REPOSITORY_CLASS_NAME = "org.apache.nifi.logging.repository.StandardLogRepository";
+
+ private static final ConcurrentMap<String, LogRepository> repositoryMap = new ConcurrentHashMap<>();
+ private static final Class<LogRepository> logRepositoryClass;
+
+ static {
+ Class<LogRepository> clazz = null;
+ try {
+ clazz = (Class<LogRepository>) Class.forName(LOG_REPOSITORY_CLASS_NAME, true, LogRepositoryFactory.class.getClassLoader());
+ } catch (ClassNotFoundException e) {
+ LoggerFactory.getLogger(LogRepositoryFactory.class).error("Unable to find class {}; logging may not work properly", LOG_REPOSITORY_CLASS_NAME);
+ }
+ logRepositoryClass = clazz;
+ }
+
+ public static LogRepository getRepository(final String processorId) {
+ LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+ if (repository == null) {
+ try {
+ repository = logRepositoryClass.newInstance();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+ if (oldRepository != null) {
+ repository = oldRepository;
+ }
+ }
+
+ return repository;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
new file mode 100644
index 0000000..b25c90b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.Closeable;
+
+/**
+ *
+ */
+public class NarCloseable implements Closeable {
+
+ public static NarCloseable withNarLoader() {
+ final ClassLoader current = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+ return new NarCloseable(current);
+ }
+
+ private final ClassLoader toSet;
+
+ private NarCloseable(final ClassLoader toSet) {
+ this.toSet = toSet;
+ }
+
+ @Override
+ public void close() {
+ if (toSet != null) {
+ Thread.currentThread().setContextClassLoader(toSet);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
new file mode 100644
index 0000000..aa905a8
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.ReportingTask;
+
+/**
+ *
+ * @author none THREAD SAFE
+ */
+public class NarThreadContextClassLoader extends URLClassLoader {
+
+ static final ContextSecurityManager contextSecurityManager = new ContextSecurityManager();
+ private final ClassLoader forward = ClassLoader.getSystemClassLoader();
+ private static final List<Class<?>> narSpecificClasses = new ArrayList<>();
+
+ static {
+ narSpecificClasses.add(Processor.class);
+ narSpecificClasses.add(FlowFilePrioritizer.class);
+ narSpecificClasses.add(ReportingTask.class);
+ narSpecificClasses.add(Validator.class);
+ narSpecificClasses.add(InputStreamCallback.class);
+ narSpecificClasses.add(OutputStreamCallback.class);
+ narSpecificClasses.add(StreamCallback.class);
+ narSpecificClasses.add(ControllerService.class);
+ narSpecificClasses.add(AuthorityProvider.class);
+ narSpecificClasses.add(ProvenanceEventRepository.class);
+ narSpecificClasses.add(ComponentStatusRepository.class);
+ narSpecificClasses.add(FlowFileRepository.class);
+ narSpecificClasses.add(FlowFileSwapManager.class);
+ narSpecificClasses.add(ContentRepository.class);
+ }
+
+ private NarThreadContextClassLoader() {
+ super(new URL[0]);
+ }
+
+ @Override
+ public void clearAssertionStatus() {
+ lookupClassLoader().clearAssertionStatus();
+ }
+
+ @Override
+ public URL getResource(String name) {
+ return lookupClassLoader().getResource(name);
+ }
+
+ @Override
+ public InputStream getResourceAsStream(String name) {
+ return lookupClassLoader().getResourceAsStream(name);
+ }
+
+ @Override
+ public Enumeration<URL> getResources(String name) throws IOException {
+ return lookupClassLoader().getResources(name);
+ }
+
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ return lookupClassLoader().loadClass(name);
+ }
+
+ @Override
+ public void setClassAssertionStatus(String className, boolean enabled) {
+ lookupClassLoader().setClassAssertionStatus(className, enabled);
+ }
+
+ @Override
+ public void setDefaultAssertionStatus(boolean enabled) {
+ lookupClassLoader().setDefaultAssertionStatus(enabled);
+ }
+
+ @Override
+ public void setPackageAssertionStatus(String packageName, boolean enabled) {
+ lookupClassLoader().setPackageAssertionStatus(packageName, enabled);
+ }
+
+ private ClassLoader lookupClassLoader() {
+ final Class<?>[] classStack = contextSecurityManager.getExecutionStack();
+
+ for (Class<?> currentClass : classStack) {
+ final Class<?> narClass = findNarClass(currentClass);
+ if (narClass != null) {
+ final ClassLoader desiredClassLoader = narClass.getClassLoader();
+
+ // When new Threads are created, the new Thread inherits the ClassLoaderContext of
+ // the caller. However, the call stack of that new Thread may not trace back to any NiFi-specific
+ // code. Therefore, the NarThreadContextClassLoader will be unable to find the appropriate NAR
+ // ClassLoader. As a result, we want to set the ContextClassLoader to the NAR ClassLoader that
+ // contains the class or resource that we are looking for.
+ // This locks the current Thread into the appropriate NAR ClassLoader Context. The framework will change
+ // the ContextClassLoader back to the NarThreadContextClassLoader as appropriate via the
+ // {@link FlowEngine.beforeExecute(Thread, Runnable)} and
+ // {@link FlowEngine.afterExecute(Thread, Runnable)} methods.
+ if (desiredClassLoader instanceof NarClassLoader) {
+ Thread.currentThread().setContextClassLoader(desiredClassLoader);
+ }
+ return desiredClassLoader;
+ }
+ }
+ return forward;
+ }
+
+ private Class<?> findNarClass(final Class<?> cls) {
+ for (final Class<?> narClass : narSpecificClasses) {
+ if (narClass.isAssignableFrom(cls)) {
+ return cls;
+ } else if (cls.getEnclosingClass() != null) {
+ return findNarClass(cls.getEnclosingClass());
+ }
+ }
+
+ return null;
+ }
+
+ private static class SingletonHolder {
+
+ public static final NarThreadContextClassLoader instance = new NarThreadContextClassLoader();
+ }
+
+ public static NarThreadContextClassLoader getInstance() {
+ return SingletonHolder.instance;
+ }
+
+ static class ContextSecurityManager extends SecurityManager {
+
+ Class<?>[] getExecutionStack() {
+ return getClassContext();
+ }
+ }
+
+ public static <T> T createInstance(final String implementationClassName, final Class<T> typeDefinition) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+ try {
+ final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName);
+ final Class<?> rawClass;
+ if (detectedClassLoaderForType == null) {
+ // try to find from the current class loader
+ rawClass = Class.forName(implementationClassName);
+ } else {
+ // try to find from the registered classloader for that type
+ rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName));
+ }
+
+ Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+ final Class<?> desiredClass = rawClass.asSubclass(typeDefinition);
+ return typeDefinition.cast(desiredClass.newInstance());
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..2422fe1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public class Peer {
+
+ private final CommunicationsSession commsSession;
+ private final String url;
+ private final String host;
+ private long penalizationExpiration = 0L;
+ private boolean closed = false;
+
+ public Peer(final CommunicationsSession commsSession, final String url) {
+ this.commsSession = commsSession;
+ this.url = url;
+
+ try {
+ this.host = new URI(url).getHost();
+ } catch (final Exception e) {
+ throw new IllegalArgumentException("Invalid URL: " + url);
+ }
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public CommunicationsSession getCommunicationsSession() {
+ return commsSession;
+ }
+
+ public void close() throws IOException {
+ this.closed = true;
+
+ // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+ commsSession.close();
+ }
+
+ public void penalize(final long millis) {
+ penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+ }
+
+ public boolean isPenalized() {
+ return penalizationExpiration > System.currentTimeMillis();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int hashCode() {
+ return 8320 + url.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Peer)) {
+ return false;
+ }
+
+ final Peer other = (Peer) obj;
+ return this.url.equals(other.url);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Peer[url=").append(url);
+ if (closed) {
+ sb.append(",CLOSED");
+ } else if (isPenalized()) {
+ sb.append(",PENALIZED");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..d1cb076
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+ private final String hostname;
+ private final int port;
+ private final boolean secure;
+ private final int numFlowFiles;
+
+ public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
+ this.hostname = hostname;
+ this.port = port;
+ this.secure = secure;
+ this.numFlowFiles = numFlowFiles;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ public int getFlowFileCount() {
+ return numFlowFiles;
+ }
+
+ @Override
+ public String toString() {
+ return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 9824372 + hostname.hashCode() + port;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof PeerStatus)) {
+ return false;
+ }
+
+ final PeerStatus other = (PeerStatus) obj;
+ return port == other.port && hostname.equals(other.hostname);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
new file mode 100644
index 0000000..8f2603a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface PortAuthorizationResult {
+
+ boolean isAuthorized();
+
+ String getExplanation();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
new file mode 100644
index 0000000..12a3d33
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ *
+ */
+public enum RemoteAuthorizationState {
+
+ UNKNOWN,
+ UNAUTHORIZED,
+ AUTHORIZED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
new file mode 100644
index 0000000..d4ad374
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.groups.RemoteProcessGroup;
+
+public interface RemoteGroupPort extends Port {
+
+ RemoteProcessGroup getRemoteProcessGroup();
+
+ TransferDirection getTransferDirection();
+
+ boolean isUseCompression();
+
+ void setUseCompression(boolean useCompression);
+
+ boolean getTargetExists();
+
+ boolean isTargetRunning();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
new file mode 100644
index 0000000..4afdfb7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.protocol.ServerProtocol;
+
+public interface RootGroupPort extends Port {
+
+ boolean isTransmitting();
+
+ void setGroupAccessControl(Set<String> groups);
+
+ Set<String> getGroupAccessControl();
+
+ void setUserAccessControl(Set<String> users);
+
+ Set<String> getUserAccessControl();
+
+ /**
+ * Verifies that the specified user is authorized to interact with this port
+ * and returns a {@link PortAuthorizationResult} indicating why the user is
+ * unauthorized if this assumption fails
+ *
+ * @param dn
+ * @return
+ */
+ PortAuthorizationResult checkUserAuthorization(String dn);
+
+ /**
+ * Receives data from the given stream
+ *
+ * @param peer
+ * @param serverProtocol
+ * @param requestHeaders
+ *
+ * @return the number of FlowFiles received
+ * @throws org.apache.nifi.remote.exception.NotAuthorizedException
+ * @throws org.apache.nifi.remote.exception.BadRequestException
+ * @throws org.apache.nifi.remote.exception.RequestExpiredException
+ */
+ int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
+
+ /**
+ * Transfers data to the given stream
+ *
+ * @param peer
+ * @param requestHeaders
+ * @param serverProtocol
+ *
+ * @return the number of FlowFiles transferred
+ * @throws org.apache.nifi.remote.exception.NotAuthorizedException
+ * @throws org.apache.nifi.remote.exception.BadRequestException
+ * @throws org.apache.nifi.remote.exception.RequestExpiredException
+ */
+ int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
+
+}