You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/07 23:53:31 UTC

[11/17] nifi git commit: NIFI-810: Created RequiresInput annotation and ensure that processors are invalid if connections do not agree

NIFI-810: Created RequiresInput annotation and ensure that processors are invalid if connections do not agree


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4afd8f88
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4afd8f88
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4afd8f88

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 4afd8f88f8a34cf87f2a06221667166a54c99a15
Parents: 31fba6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Sep 25 11:39:28 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 7 17:26:14 2015 -0400

----------------------------------------------------------------------
 .../annotation/behavior/InputRequirement.java   |   51 +
 .../src/main/asciidoc/developer-guide.adoc      |   11 +
 .../nifi/processors/avro/ConvertAvroToJSON.java |    3 +
 .../processors/avro/ExtractAvroMetadata.java    |   29 +-
 .../apache/nifi/processors/avro/SplitAvro.java  |   27 +-
 .../nifi/processors/aws/s3/FetchS3Object.java   |    3 +
 .../nifi/processors/aws/s3/PutS3Object.java     |    6 +-
 .../apache/nifi/processors/aws/sns/PutSNS.java  |    3 +
 .../nifi/processors/aws/sqs/DeleteSQS.java      |    3 +
 .../apache/nifi/processors/aws/sqs/GetSQS.java  |    5 +-
 .../apache/nifi/processors/aws/sqs/PutSQS.java  |    5 +-
 .../nifi/processors/flume/ExecuteFlumeSink.java |   14 +-
 .../processors/flume/ExecuteFlumeSource.java    |   14 +-
 .../apache/nifi/controller/ProcessorNode.java   |   89 +-
 .../nifi/controller/StandardProcessorNode.java  | 2440 +++++++++---------
 .../org/apache/nifi/processors/GeoEnrichIP.java |    3 +
 .../hadoop/CreateHadoopSequenceFile.java        |    4 +-
 .../nifi/processors/hadoop/FetchHDFS.java       |    3 +
 .../apache/nifi/processors/hadoop/GetHDFS.java  |    3 +
 .../apache/nifi/processors/hadoop/ListHDFS.java |    3 +
 .../apache/nifi/processors/hadoop/PutHDFS.java  |    3 +
 .../processors/hl7/ExtractHL7Attributes.java    |    3 +
 .../apache/nifi/processors/hl7/RouteHL7.java    |    3 +
 .../processors/image/ExtractImageMetadata.java  |   36 +-
 .../nifi/processors/image/ResizeImage.java      |   38 +-
 .../apache/nifi/processors/kafka/GetKafka.java  |   21 +-
 .../apache/nifi/processors/kafka/PutKafka.java  |   10 +-
 .../nifi/processors/kite/ConvertCSVToAvro.java  |   16 +-
 .../nifi/processors/kite/ConvertJSONToAvro.java |   14 +-
 .../processors/kite/StoreInKiteDataset.java     |    9 +-
 .../nifi/processors/yandex/YandexTranslate.java |    3 +
 .../nifi-pcap-processors/.gitignore             |    1 +
 .../nifi/processors/twitter/GetTwitter.java     |    5 +-
 .../apache/nifi/processors/solr/GetSolr.java    |   43 +-
 .../processors/solr/PutSolrContentStream.java   |   33 +-
 .../standard/Base64EncodeContent.java           |  171 +-
 .../processors/standard/CompressContent.java    |   15 +-
 .../nifi/processors/standard/ControlRate.java   |  683 ++---
 .../standard/ConvertCharacterSet.java           |    3 +
 .../processors/standard/ConvertJSONToSQL.java   |    3 +
 .../processors/standard/DetectDuplicate.java    |    3 +
 .../processors/standard/DistributeLoad.java     |    3 +
 .../processors/standard/DuplicateFlowFile.java  |    3 +
 .../nifi/processors/standard/EncodeContent.java |   15 +-
 .../processors/standard/EncryptContent.java     |    3 +
 .../processors/standard/EvaluateJsonPath.java   |   38 +-
 .../nifi/processors/standard/EvaluateXPath.java |   29 +-
 .../processors/standard/EvaluateXQuery.java     |   25 +-
 .../processors/standard/ExecuteProcess.java     |    3 +
 .../nifi/processors/standard/ExecuteSQL.java    |    3 +
 .../standard/ExecuteStreamCommand.java          |    7 +-
 .../nifi/processors/standard/ExtractText.java   |    3 +
 .../processors/standard/GenerateFlowFile.java   |   11 +-
 .../apache/nifi/processors/standard/GetFTP.java |   13 +-
 .../nifi/processors/standard/GetFile.java       |    7 +-
 .../nifi/processors/standard/GetHTTP.java       |    3 +
 .../nifi/processors/standard/GetJMSQueue.java   |    3 +
 .../nifi/processors/standard/GetJMSTopic.java   |    3 +
 .../nifi/processors/standard/GetSFTP.java       |    7 +-
 .../processors/standard/HandleHttpRequest.java  |    7 +-
 .../processors/standard/HandleHttpResponse.java |    5 +-
 .../nifi/processors/standard/HashAttribute.java |    5 +-
 .../nifi/processors/standard/HashContent.java   |    5 +-
 .../processors/standard/IdentifyMimeType.java   |    5 +-
 .../nifi/processors/standard/InvokeHTTP.java    |    3 +
 .../nifi/processors/standard/ListenHTTP.java    |   16 +-
 .../nifi/processors/standard/ListenUDP.java     |   18 +-
 .../nifi/processors/standard/LogAttribute.java  |   16 +-
 .../nifi/processors/standard/MergeContent.java  |   11 +-
 .../nifi/processors/standard/ModifyBytes.java   |   14 +-
 .../processors/standard/MonitorActivity.java    |   31 +-
 .../nifi/processors/standard/PostHTTP.java      |    3 +
 .../nifi/processors/standard/PutEmail.java      |    3 +
 .../apache/nifi/processors/standard/PutFTP.java |    3 +
 .../nifi/processors/standard/PutFile.java       |    3 +
 .../apache/nifi/processors/standard/PutJMS.java |    5 +-
 .../nifi/processors/standard/PutSFTP.java       |    3 +
 .../apache/nifi/processors/standard/PutSQL.java |    3 +
 .../nifi/processors/standard/ReplaceText.java   |   54 +-
 .../standard/ReplaceTextWithMapping.java        |   18 +-
 .../processors/standard/RouteOnAttribute.java   |    3 +
 .../processors/standard/RouteOnContent.java     |   19 +-
 .../nifi/processors/standard/ScanAttribute.java |   19 +-
 .../nifi/processors/standard/ScanContent.java   |    5 +-
 .../processors/standard/SegmentContent.java     |    7 +-
 .../nifi/processors/standard/SplitContent.java  |    7 +-
 .../nifi/processors/standard/SplitJson.java     |   32 +-
 .../nifi/processors/standard/SplitText.java     |   53 +-
 .../nifi/processors/standard/SplitXml.java      |   18 +-
 .../nifi/processors/standard/TransformXml.java  |    3 +
 .../nifi/processors/standard/UnpackContent.java |    9 +-
 .../nifi/processors/standard/ValidateXml.java   |   16 +-
 .../processors/attributes/UpdateAttribute.java  |    9 +-
 93 files changed, 2418 insertions(+), 2027 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
new file mode 100644
index 0000000..97e6b88
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
@@ -0,0 +1,51 @@
+package org.apache.nifi.annotation.behavior;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * <p>
+ * Marker interface that a Processor can use to indicate whether it accepts, requires, or forbids
+ * input from other Processors. This information is used by the framework in order to ensure that
+ * a Processor is marked as invalid if it is missing necessary input or has input that will be ignored.
+ * This information also is used by the NiFi UI in order to prevent users from making connections
+ * to Processors that don't make sense.
+ * </p>
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface InputRequirement {
+	Requirement value();
+	
+	public static enum Requirement {
+		/**
+		 * This value is used to indicate that the Processor requires input from other Processors
+		 * in order to run. As a result, the Processor will not be valid if it does not have any
+		 * incoming connections.
+		 */
+		INPUT_REQUIRED,
+		
+		/**
+		 * This value is used to indicate that the Processor will consume data from an incoming
+		 * connection but does not require an incoming connection in order to perform its task.
+		 * If the {@link InputRequirement} annotation is not present, this is the default value
+		 * that is used.
+		 */
+		INPUT_ALLOWED,
+		
+		/**
+		 * This value is used to indicate that the Processor is a "Source Processor" and does
+		 * not accept incoming connections. Because the Processor does not pull FlowFiles from
+		 * an incoming connection, it can be very confusing for users who create incoming connections
+		 * to the Processor. As a result, this value can be used in order to clarify that incoming
+		 * connections will not be used. This prevents the user from even creating such a connection.
+		 */
+		INPUT_FORBIDDEN;
+	}
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index f9950d5..28df5c2 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -1633,6 +1633,17 @@ will handle your Processor:
 		not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run
 		periodically to time out a network connection.
 
+    - `InputRequirement`: By default, all Processors will allow users to create incoming connections for the Processor, but
+        if the user does not create an incoming connection, the Processor is still valid and can be scheduled to run. For Processors
+        that are expected to be used as a "Source Processor," though, this can be confusing to the user, and the user may attempt to
+        send FlowFiles to that Processor, only for the FlowFiles to queue up without being processed. Conversely, if the Processor
+        expects incoming FlowFiles but does not have an input queue, the Processor will be scheduled to run but will perform no work,
+        as it will receive no FlowFile, and this leads to confusion as well. As a result, we can use the `@InputRequirement` annotation
+        and provide it a value of `INPUT_REQUIRED`, `INPUT_ALLOWED`, or `INPUT_FORBIDDEN`. This provides information to the framework
+        about when the Processor should be made invalid, or whether or not the user should even be able to draw a Connection to the
+        Processor. For instance, if a Processor is annotated with `InputRequirement(Requirement.INPUT_FORBIDDEN)`, then the user will
+        not even be able to create a Connection with that Processor as the destination.
+
 
 === Data Buffering
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8832a73..b214427 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -29,6 +29,8 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,6 +48,7 @@ import org.apache.nifi.processor.io.StreamCallback;
 @SideEffectFree
 @SupportsBatching
 @Tags({ "json", "avro", "binary" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
index 48aad7d..4cf5289 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
@@ -16,6 +16,19 @@
  */
 package org.apache.nifi.processors.avro;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaNormalization;
 import org.apache.avro.file.DataFileStream;
@@ -23,6 +36,8 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -41,22 +56,10 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
 @SideEffectFree
 @SupportsBatching
 @Tags({ "avro", "schema", "metadata" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Extracts metadata from the header of an Avro datafile.")
 @WritesAttributes({
         @WritesAttribute(attribute = "schema.type", description = "The type of the schema (i.e. record, enum, etc.)."),

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index 3b344b5..dbf5778 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -16,6 +16,18 @@
  */
 package org.apache.nifi.processors.avro;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileStream;
@@ -26,6 +38,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -45,21 +59,10 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.ObjectHolder;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 @SideEffectFree
 @SupportsBatching
 @Tags({ "avro", "split" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " +
         "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
 public class SplitAvro extends AbstractProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 2406b67..131e671 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -45,6 +47,7 @@ import com.amazonaws.services.s3.model.S3Object;
 
 @SupportsBatching
 @SeeAlso({PutS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
 @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 24c82dd..7398c4e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -54,6 +56,7 @@ import com.amazonaws.services.s3.model.StorageClass;
 
 @SupportsBatching
 @SeeAlso({FetchS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
 @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
 @DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
@@ -101,7 +104,8 @@ public class PutS3Object extends AbstractS3Processor {
                 .build();
     }
 
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+    @Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 7d42703..e571ff4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -41,6 +43,7 @@ import com.amazonaws.services.sns.model.PublishRequest;
 
 @SupportsBatching
 @SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
 @CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
 public class PutSNS extends AbstractSNSProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 65e020d..f88aa71 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -37,6 +39,7 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
 
 @SupportsBatching
 @SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
 @CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
 public class DeleteSQS extends AbstractSQSProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 7c2dd2d..a140999 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -51,8 +53,9 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
 @SupportsBatching
+@SeeAlso({ PutSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
-@SeeAlso({PutSQS.class, DeleteSQS.class})
 @CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
 @WritesAttributes({
     @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index 3961f32..0af508e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -45,8 +47,9 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
 import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
 
 @SupportsBatching
+@SeeAlso({ GetSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
-@SeeAlso({GetSQS.class, DeleteSQS.class})
 @CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
 @DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute",
         description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
index 57e0278..f93b215 100644
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
+++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
@@ -16,20 +16,19 @@
  */
 package org.apache.nifi.processors.flume;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.conf.Configurables;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.ProcessContext;
@@ -40,12 +39,17 @@ import org.apache.nifi.processor.SchedulingContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * This processor runs a Flume sink
  */
+@TriggerSerially
 @Tags({"flume", "hadoop", "put", "sink"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.")
-@TriggerSerially
 public class ExecuteFlumeSink extends AbstractFlumeProcessor {
 
     public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
index 600f4b1..3aad6b7 100644
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
+++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
@@ -16,12 +16,10 @@
  */
 package org.apache.nifi.processors.flume;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.PollableSource;
@@ -29,12 +27,13 @@ import org.apache.flume.Source;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.EventDrivenSourceRunner;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.ProcessContext;
@@ -46,12 +45,17 @@ import org.apache.nifi.processor.SchedulingContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * This processor runs a Flume source
  */
+@TriggerSerially
 @Tags({"flume", "hadoop", "get", "source"})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile")
-@TriggerSerially
 public class ExecuteFlumeSource extends AbstractFlumeProcessor {
 
     public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index f2a83d0..2f72d0f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -30,70 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 
 public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
 
-    public ProcessorNode(final Processor processor, final String id,
-            final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
-        super(processor, id, validationContextFactory, serviceProvider);
-    }
+	public ProcessorNode(final Processor processor, final String id,
+		final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+		super(processor, id, validationContextFactory, serviceProvider);
+	}
 
-    public abstract boolean isIsolated();
+	public abstract boolean isIsolated();
 
-    public abstract boolean isTriggerWhenAnyDestinationAvailable();
+	public abstract boolean isTriggerWhenAnyDestinationAvailable();
 
-    @Override
-    public abstract boolean isSideEffectFree();
+	@Override
+	public abstract boolean isSideEffectFree();
 
-    public abstract boolean isTriggeredSerially();
+	public abstract boolean isTriggeredSerially();
 
-    public abstract boolean isEventDrivenSupported();
+	public abstract boolean isEventDrivenSupported();
 
-    public abstract boolean isHighThroughputSupported();
+	public abstract boolean isHighThroughputSupported();
 
-    @Override
-    public abstract boolean isValid();
+	public abstract Requirement getInputRequirement();
 
-    public abstract void setScheduledState(ScheduledState scheduledState);
+	@Override
+	public abstract boolean isValid();
 
-    public abstract void setBulletinLevel(LogLevel bulletinLevel);
+	public abstract void setScheduledState(ScheduledState scheduledState);
 
-    public abstract LogLevel getBulletinLevel();
+	public abstract void setBulletinLevel(LogLevel bulletinLevel);
 
-    public abstract Processor getProcessor();
+	public abstract LogLevel getBulletinLevel();
 
-    public abstract void yield(long period, TimeUnit timeUnit);
+	public abstract Processor getProcessor();
 
-    public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
+	public abstract void yield(long period, TimeUnit timeUnit);
 
-    public abstract Set<Relationship> getAutoTerminatedRelationships();
+	public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
 
-    public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+	public abstract Set<Relationship> getAutoTerminatedRelationships();
 
-    @Override
-    public abstract SchedulingStrategy getSchedulingStrategy();
+	public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
 
-    public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+	@Override
+	public abstract SchedulingStrategy getSchedulingStrategy();
 
-    public abstract long getRunDuration(TimeUnit timeUnit);
+	public abstract void setRunDuration(long duration, TimeUnit timeUnit);
 
-    public abstract Map<String, String> getStyle();
+	public abstract long getRunDuration(TimeUnit timeUnit);
 
-    public abstract void setStyle(Map<String, String> style);
+	public abstract Map<String, String> getStyle();
 
-    /**
-     * @return the number of threads (concurrent tasks) currently being used by
-     * this Processor
-     */
-    public abstract int getActiveThreadCount();
+	public abstract void setStyle(Map<String, String> style);
 
-    /**
-     * Verifies that this Processor can be started if the provided set of
-     * services are enabled. This is introduced because we need to verify that
-     * all components can be started before starting any of them. In order to do
-     * that, we need to know that this component can be started if the given
-     * services are enabled, as we will then enable the given services before
-     * starting this component.
-     *
-     * @param ignoredReferences to ignore
-     */
-    public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+	/**
+	 * @return the number of threads (concurrent tasks) currently being used by
+	 * this Processor
+	 */
+	public abstract int getActiveThreadCount();
+
+	/**
+	 * Verifies that this Processor can be started if the provided set of
+	 * services are enabled. This is introduced because we need to verify that
+	 * all components can be started before starting any of them. In order to do
+	 * that, we need to know that this component can be started if the given
+	 * services are enabled, as we will then enable the given services before
+	 * starting this component.
+	 *
+	 * @param ignoredReferences to ignore
+	 */
+	public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
 
 }