You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/07 23:53:29 UTC

[09/17] nifi git commit: NIFI-810: Created RequiresInput annotation and ensure that processors are invalid if connections do not agree

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index cbcc54d..385ac73 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -23,7 +23,8 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -57,6 +58,7 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
  *
  */
 @SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "sequence file", "create", "sequencefile"})
 @CapabilityDescription("Creates Hadoop Sequence Files from incoming flow files")
 @SeeAlso(PutHDFS.class)

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 4a52fb7..aa03e73 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -44,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
 @CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
         + "The file in HDFS is left intact without any changes being made to it.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index de776d4..4c9deea 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 @TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
 @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 151cbf2..563bda8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -66,6 +68,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 @TriggerSerially
 @TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
 @CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
         + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 901159b..bedf1b9 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -59,6 +61,7 @@ import org.apache.nifi.util.StopWatch;
 /**
  * This processor copies FlowFiles to HDFS.
  */
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
 @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
 @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
index 574fb2d..3a6ac79 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,6 +59,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
 
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"})
 @CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. "
         + "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be "

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
index 53e7e69..26e8bb6 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -29,6 +29,8 @@ import java.util.Set;
 import org.apache.nifi.annotation.behavior.DynamicProperties;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -63,6 +65,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
 @EventDriven
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"HL7", "healthcare", "route", "Health Level 7"})
 @DynamicProperties({
     @DynamicProperty(name = "Name of a Relationship", value = "An HL7 Query Language query",

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
index 7fe6195..b44eccd 100644
--- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
+++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
@@ -16,11 +16,18 @@
  */
 package org.apache.nifi.processors.image;
 
-import com.drew.imaging.ImageMetadataReader;
-import com.drew.imaging.ImageProcessingException;
-import com.drew.metadata.Directory;
-import com.drew.metadata.Metadata;
-import com.drew.metadata.Tag;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -30,25 +37,22 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.ObjectHolder;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.HashMap;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.Tag;
 
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Exif", "Exchangeable", "image", "file", "format", "JPG", "GIF", "PNG", "BMP", "metadata","IPTC", "XMP"})
 @CapabilityDescription("Extract the image metadata from flowfiles containing images. This processor relies on this "
         + "metadata extractor library https://github.com/drewnoakes/metadata-extractor. It extracts a long list of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
index c085b5f..176561f 100644
--- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
+++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
@@ -17,7 +17,27 @@
 
 package org.apache.nifi.processors.image;
 
+import java.awt.Graphics2D;
+import java.awt.Image;
+import java.awt.image.BufferedImage;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReader;
+import javax.imageio.stream.ImageInputStream;
+
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,25 +53,9 @@ import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
-import javax.imageio.ImageIO;
-import javax.imageio.ImageReader;
-import javax.imageio.stream.ImageInputStream;
-import java.awt.Image;
-import java.awt.Graphics2D;
-import java.awt.image.BufferedImage;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
 @EventDriven
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "resize", "image", "jpg", "jpeg", "png", "bmp", "wbmp", "gif" })
 @CapabilityDescription("Resizes an image to user-specified dimensions. This Processor uses the image codecs registered with the "
     + "environment that NiFi is running in. By default, this includes JPEG, PNG, BMP, WBMP, and GIF images.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 26590df..e10977b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -32,18 +32,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -58,7 +53,15 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Fetches messages from Apache Kafka")
 @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index d83c7bf..cff285c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -58,9 +56,13 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 import org.apache.nifi.util.LongHolder;
 
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
 public class PutKafka extends AbstractProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6c20a8f..6f126aa 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -18,18 +18,20 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -53,11 +55,13 @@ import org.kitesdk.data.spi.DefaultConfiguration;
 import org.kitesdk.data.spi.filesystem.CSVFileReader;
 import org.kitesdk.data.spi.filesystem.CSVProperties;
 
-import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 @Tags({"kite", "csv", "avro"})
-@CapabilityDescription(
-        "Converts CSV files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
 public class ConvertCSVToAvro extends AbstractKiteProcessor {
 
     private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index ec1503c..af120bf 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -18,18 +18,18 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -47,9 +47,13 @@ import org.kitesdk.data.SchemaNotFoundException;
 import org.kitesdk.data.spi.DefaultConfiguration;
 import org.kitesdk.data.spi.filesystem.JSONFileReader;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 @Tags({"kite", "json", "avro"})
-@CapabilityDescription(
-        "Converts JSON files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
 public class ConvertJSONToAvro extends AbstractKiteProcessor {
 
     private static final Relationship SUCCESS = new Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 7a30db1..1986f0b 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -18,16 +18,17 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -46,6 +47,10 @@ import org.kitesdk.data.ValidationException;
 import org.kitesdk.data.View;
 import org.kitesdk.data.spi.SchemaValidationUtil;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
 @CapabilityDescription("Stores Avro records in a Kite dataset")
 public class StoreInKiteDataset extends AbstractKiteProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
index 8398152..5f58781 100644
--- a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
+++ b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
@@ -33,6 +33,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -70,6 +72,7 @@ import com.sun.jersey.api.json.JSONConfiguration;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"yandex", "translate", "translation", "language"})
 @CapabilityDescription("Translates content and attributes from one language to another")
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index a78b112..e41b583 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -32,6 +32,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,8 +59,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Client;
 import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.Location.Coordinate ;
 import com.twitter.hbc.core.endpoint.Location ;
+import com.twitter.hbc.core.endpoint.Location.Coordinate ;
 import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
@@ -69,6 +71,7 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"twitter", "tweets", "social media", "status", "json"})
 @CapabilityDescription("Pulls status changes from Twitter's streaming API")
 @WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index ff264a1..a85aa0f 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -18,7 +18,29 @@
  */
 package org.apache.nifi.processors.solr;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -41,27 +63,8 @@ import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 @Tags({"Apache", "Solr", "Get", "Pull"})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
 public class GetSolr extends SolrProcessor {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 560ad34..df034c9 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -18,7 +18,24 @@
  */
 package org.apache.nifi.processors.solr;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -40,22 +57,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.MultiMapSolrParams;
 import org.apache.solr.common.util.ContentStreamBase;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
 @Tags({"Apache", "Solr", "Put", "Send"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
 @DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
         description="These parameters will be passed to Solr on the request")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 9887e38..816b407 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.binary.Base64InputStream;
 import org.apache.commons.codec.binary.Base64OutputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -51,101 +53,102 @@ import org.apache.nifi.util.StopWatch;
 @SupportsBatching
 @Tags({"encode", "base64"})
 @CapabilityDescription("Encodes or decodes content to and from base64")
+@InputRequirement(Requirement.INPUT_REQUIRED)
 public class Base64EncodeContent extends AbstractProcessor {
 
-    public static final String ENCODE_MODE = "Encode";
-    public static final String DECODE_MODE = "Decode";
+	public static final String ENCODE_MODE = "Encode";
+	public static final String DECODE_MODE = "Decode";
 
-    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
-            .name("Mode")
-            .description("Specifies whether the content should be encoded or decoded")
-            .required(true)
-            .allowableValues(ENCODE_MODE, DECODE_MODE)
-            .defaultValue(ENCODE_MODE)
-            .build();
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
-            .build();
+	public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+		.name("Mode")
+		.description("Specifies whether the content should be encoded or decoded")
+		.required(true)
+		.allowableValues(ENCODE_MODE, DECODE_MODE)
+		.defaultValue(ENCODE_MODE)
+		.build();
+	public static final Relationship REL_SUCCESS = new Relationship.Builder()
+		.name("success")
+		.description("Any FlowFile that is successfully encoded or decoded will be routed to success")
+		.build();
+	public static final Relationship REL_FAILURE = new Relationship.Builder()
+		.name("failure")
+		.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
+		.build();
 
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
+	private List<PropertyDescriptor> properties;
+	private Set<Relationship> relationships;
 
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(MODE);
-        this.properties = Collections.unmodifiableList(properties);
+	@Override
+	protected void init(final ProcessorInitializationContext context) {
+		final List<PropertyDescriptor> properties = new ArrayList<>();
+		properties.add(MODE);
+		this.properties = Collections.unmodifiableList(properties);
 
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
+		final Set<Relationship> relationships = new HashSet<>();
+		relationships.add(REL_SUCCESS);
+		relationships.add(REL_FAILURE);
+		this.relationships = Collections.unmodifiableSet(relationships);
+	}
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
+	@Override
+	public Set<Relationship> getRelationships() {
+		return relationships;
+	}
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
+	@Override
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		return properties;
+	}
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) {
+		FlowFile flowFile = session.get();
+		if (flowFile == null) {
+			return;
+		}
 
-        final ProcessorLog logger = getLogger();
+		final ProcessorLog logger = getLogger();
 
-        boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
-        try {
-            final StopWatch stopWatch = new StopWatch(true);
-            if (encode) {
-                flowFile = session.write(flowFile, new StreamCallback() {
-                    @Override
-                    public void process(InputStream in, OutputStream out) throws IOException {
-                        try (Base64OutputStream bos = new Base64OutputStream(out)) {
-                            int len = -1;
-                            byte[] buf = new byte[8192];
-                            while ((len = in.read(buf)) > 0) {
-                                bos.write(buf, 0, len);
-                            }
-                            bos.flush();
-                        }
-                    }
-                });
-            } else {
-                flowFile = session.write(flowFile, new StreamCallback() {
-                    @Override
-                    public void process(InputStream in, OutputStream out) throws IOException {
-                        try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
-                            int len = -1;
-                            byte[] buf = new byte[8192];
-                            while ((len = bis.read(buf)) > 0) {
-                                out.write(buf, 0, len);
-                            }
-                            out.flush();
-                        }
-                    }
-                });
-            }
+		boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+		try {
+			final StopWatch stopWatch = new StopWatch(true);
+			if (encode) {
+				flowFile = session.write(flowFile, new StreamCallback() {
+					@Override
+					public void process(InputStream in, OutputStream out) throws IOException {
+						try (Base64OutputStream bos = new Base64OutputStream(out)) {
+							int len = -1;
+							byte[] buf = new byte[8192];
+							while ((len = in.read(buf)) > 0) {
+								bos.write(buf, 0, len);
+							}
+							bos.flush();
+						}
+					}
+				});
+			} else {
+				flowFile = session.write(flowFile, new StreamCallback() {
+					@Override
+					public void process(InputStream in, OutputStream out) throws IOException {
+						try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
+							int len = -1;
+							byte[] buf = new byte[8192];
+							while ((len = bis.read(buf)) > 0) {
+								out.write(buf, 0, len);
+							}
+							out.flush();
+						}
+					}
+				});
+			}
 
-            logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
-            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-        } catch (ProcessException e) {
-            logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
+			logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
+			session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+			session.transfer(flowFile, REL_SUCCESS);
+		} catch (ProcessException e) {
+			logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
+			session.transfer(flowFile, REL_FAILURE);
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 1b9b20c..593cf44 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -29,20 +29,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import lzma.sdk.lzma.Decoder;
-import lzma.streams.LzmaInputStream;
-import lzma.streams.LzmaOutputStream;
-
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -63,9 +61,14 @@ import org.tukaani.xz.LZMA2Options;
 import org.tukaani.xz.XZInputStream;
 import org.tukaani.xz.XZOutputStream;
 
+import lzma.sdk.lzma.Decoder;
+import lzma.streams.LzmaInputStream;
+import lzma.streams.LzmaOutputStream;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 2efc852..a45c211 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -31,6 +31,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -43,10 +49,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.timebuffer.EntityAccess;
@@ -54,344 +56,345 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
 
 @SideEffectFree
 @TriggerSerially
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"rate control", "throttle", "rate", "throughput"})
 @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
 public class ControlRate extends AbstractProcessor {
 
-    public static final String DATA_RATE = "data rate";
-    public static final String FLOWFILE_RATE = "flowfile count";
-    public static final String ATTRIBUTE_RATE = "attribute value";
-
-    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
-            .name("Rate Control Criteria")
-            .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
-            .required(true)
-            .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
-            .defaultValue(DATA_RATE)
-            .build();
-    public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
-            .name("Maximum Rate")
-            .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
-                    + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
-            .build();
-    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
-            .name("Rate Controlled Attribute")
-            .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
-                    + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
-                    + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
-            .name("Time Duration")
-            .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
-            .required(true)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .defaultValue("1 min")
-            .build();
-    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
-            .name("Grouping Attribute")
-            .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
-                    + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles are transferred to this relationship")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
-            .build();
-
-    private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
-    private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
-    private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-    private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(RATE_CONTROL_CRITERIA);
-        properties.add(MAX_RATE);
-        properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
-        properties.add(TIME_PERIOD);
-        properties.add(GROUPING_ATTRIBUTE_NAME);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
-        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
-
-        final Validator rateValidator;
-        switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-            case DATA_RATE:
-                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
-                break;
-            case ATTRIBUTE_RATE:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-                final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-                if (rateAttr == null) {
-                    validationResults.add(new ValidationResult.Builder()
-                            .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
-                            .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
-                            .build());
-                }
-                break;
-            case FLOWFILE_RATE:
-            default:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-                break;
-        }
-
-        final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
-        if (!rateResult.isValid()) {
-            validationResults.add(rateResult);
-        }
-
-        return validationResults;
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        super.onPropertyModified(descriptor, oldValue, newValue);
-
-        if (descriptor.equals(RATE_CONTROL_CRITERIA)
-                || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
-                || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
-                || descriptor.equals(TIME_PERIOD)) {
-            // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
-            throttleMap.clear();
-        } else if (descriptor.equals(MAX_RATE)) {
-            final long newRate;
-            if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
-                newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
-            } else {
-                newRate = Long.parseLong(newValue);
-            }
-
-            for (final Throttle throttle : throttleMap.values()) {
-                throttle.setMaxRate(newRate);
-            }
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final long lastClearTime = lastThrottleClearTime.get();
-        final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-        if (lastClearTime < throttleExpirationMillis) {
-            if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
-                final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
-                while (itr.hasNext()) {
-                    final Map.Entry<String, Throttle> entry = itr.next();
-                    final Throttle throttle = entry.getValue();
-                    if (throttle.tryLock()) {
-                        try {
-                            if (throttle.lastUpdateTime() < lastClearTime) {
-                                itr.remove();
-                            }
-                        } finally {
-                            throttle.unlock();
-                        }
-                    }
-                }
-            }
-        }
-
-        // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final ProcessorLog logger = getLogger();
-        final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
-        final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-        long rateValue;
-        switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-            case DATA_RATE:
-                rateValue = flowFile.getSize();
-                break;
-            case FLOWFILE_RATE:
-                rateValue = 1;
-                break;
-            case ATTRIBUTE_RATE:
-                final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
-                if (attributeValue == null) {
-                    logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
-                    session.transfer(flowFile, REL_FAILURE);
-                    return;
-                }
-
-                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
-                            new Object[]{flowFile, rateControlAttributeName, attributeValue});
-                    session.transfer(flowFile, REL_FAILURE);
-                    return;
-                }
-                rateValue = Long.parseLong(attributeValue);
-                break;
-            default:
-                throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
-        }
-
-        final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
-        final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
-        Throttle throttle = throttleMap.get(groupName);
-        if (throttle == null) {
-            throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
-
-            final String maxRateValue = context.getProperty(MAX_RATE).getValue();
-            final long newRate;
-            if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
-                newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
-            } else {
-                newRate = Long.parseLong(maxRateValue);
-            }
-            throttle.setMaxRate(newRate);
-
-            throttleMap.put(groupName, throttle);
-        }
-
-        throttle.lock();
-        try {
-            if (throttle.tryAdd(rateValue)) {
-                logger.info("transferring {} to 'success'", new Object[]{flowFile});
-                session.transfer(flowFile, REL_SUCCESS);
-            } else {
-                flowFile = session.penalize(flowFile);
-                session.transfer(flowFile);
-            }
-        } finally {
-            throttle.unlock();
-        }
-    }
-
-    private static class TimestampedLong {
-
-        private final Long value;
-        private final long timestamp = System.currentTimeMillis();
-
-        public TimestampedLong(final Long value) {
-            this.value = value;
-        }
-
-        public Long getValue() {
-            return value;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
-
-    private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
-
-        @Override
-        public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
-            if (oldValue == null && toAdd == null) {
-                return new TimestampedLong(0L);
-            } else if (oldValue == null) {
-                return toAdd;
-            } else if (toAdd == null) {
-                return oldValue;
-            }
-
-            return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
-        }
-
-        @Override
-        public TimestampedLong createNew() {
-            return new TimestampedLong(0L);
-        }
-
-        @Override
-        public long getTimestamp(TimestampedLong entity) {
-            return entity == null ? 0L : entity.getTimestamp();
-        }
-    }
-
-    private static class Throttle extends ReentrantLock {
-
-        private final AtomicLong maxRate = new AtomicLong(1L);
-        private final long timePeriodValue;
-        private final TimeUnit timePeriodUnit;
-        private final TimedBuffer<TimestampedLong> timedBuffer;
-        private final ProcessorLog logger;
-
-        private volatile long penalizationExpired;
-        private volatile long lastUpdateTime;
-
-        public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
-            this.timePeriodUnit = unit;
-            this.timePeriodValue = timePeriod;
-            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
-            this.logger = logger;
-        }
-
-        public void setMaxRate(final long maxRate) {
-            this.maxRate.set(maxRate);
-        }
-
-        public long lastUpdateTime() {
-            return lastUpdateTime;
-        }
-
-        public boolean tryAdd(final long value) {
-            final long now = System.currentTimeMillis();
-            if (penalizationExpired > now) {
-                return false;
-            }
-
-            final long maxRateValue = maxRate.get();
-
-            final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
-            if (sum != null && sum.getValue() >= maxRateValue) {
-                logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
-                return false;
-            }
-
-            logger.debug("current sum for throttle is {}, so allowing rate of {} through",
-                    new Object[]{sum == null ? 0 : sum.getValue(), value});
-
-            final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
-            if (transferred > maxRateValue) {
-                final long amountOver = transferred - maxRateValue;
-                // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
-                final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
-                final double pct = (double) amountOver / (double) maxRateValue;
-                final long penalizationPeriod = (long) (milliDuration * pct);
-                this.penalizationExpired = now + penalizationPeriod;
-                logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
-            }
-
-            lastUpdateTime = now;
-            return true;
-        }
-    }
+	public static final String DATA_RATE = "data rate";
+	public static final String FLOWFILE_RATE = "flowfile count";
+	public static final String ATTRIBUTE_RATE = "attribute value";
+
+	public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
+		.name("Rate Control Criteria")
+		.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
+		.required(true)
+		.allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+		.defaultValue(DATA_RATE)
+		.build();
+	public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
+		.name("Maximum Rate")
+		.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+			+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
+		.required(true)
+		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+		.build();
+	public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+		.name("Rate Controlled Attribute")
+		.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+			+ "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+			+ "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
+		.required(false)
+		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.build();
+	public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
+		.name("Time Duration")
+		.description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
+		.required(true)
+		.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+		.defaultValue("1 min")
+		.build();
+	public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+		.name("Grouping Attribute")
+		.description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+			+ "each value specified by the attribute with this name. Changing this value resets the rate counters.")
+		.required(false)
+		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.build();
+
+	public static final Relationship REL_SUCCESS = new Relationship.Builder()
+		.name("success")
+		.description("All FlowFiles are transferred to this relationship")
+		.build();
+	public static final Relationship REL_FAILURE = new Relationship.Builder()
+		.name("failure")
+		.description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
+		.build();
+
+	private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
+	private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+	private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+	private List<PropertyDescriptor> properties;
+	private Set<Relationship> relationships;
+	private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+
+	@Override
+	protected void init(final ProcessorInitializationContext context) {
+		final List<PropertyDescriptor> properties = new ArrayList<>();
+		properties.add(RATE_CONTROL_CRITERIA);
+		properties.add(MAX_RATE);
+		properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+		properties.add(TIME_PERIOD);
+		properties.add(GROUPING_ATTRIBUTE_NAME);
+		this.properties = Collections.unmodifiableList(properties);
+
+		final Set<Relationship> relationships = new HashSet<>();
+		relationships.add(REL_SUCCESS);
+		this.relationships = Collections.unmodifiableSet(relationships);
+	}
+
+	@Override
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		return properties;
+	}
+
+	@Override
+	public Set<Relationship> getRelationships() {
+		return relationships;
+	}
+
+	@Override
+	protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+		final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+		final Validator rateValidator;
+		switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+			case DATA_RATE:
+				rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+				break;
+			case ATTRIBUTE_RATE:
+				rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+				final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+				if (rateAttr == null) {
+					validationResults.add(new ValidationResult.Builder()
+						.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+						.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+						.build());
+				}
+				break;
+			case FLOWFILE_RATE:
+			default:
+				rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+				break;
+		}
+
+		final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
+		if (!rateResult.isValid()) {
+			validationResults.add(rateResult);
+		}
+
+		return validationResults;
+	}
+
+	@Override
+	public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+		super.onPropertyModified(descriptor, oldValue, newValue);
+
+		if (descriptor.equals(RATE_CONTROL_CRITERIA)
+			|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+			|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+			|| descriptor.equals(TIME_PERIOD)) {
+			// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
+			throttleMap.clear();
+		} else if (descriptor.equals(MAX_RATE)) {
+			final long newRate;
+			if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+				newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+			} else {
+				newRate = Long.parseLong(newValue);
+			}
+
+			for (final Throttle throttle : throttleMap.values()) {
+				throttle.setMaxRate(newRate);
+			}
+		}
+	}
+
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+		final long lastClearTime = lastThrottleClearTime.get();
+		final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+		if (lastClearTime < throttleExpirationMillis) {
+			if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+				final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+				while (itr.hasNext()) {
+					final Map.Entry<String, Throttle> entry = itr.next();
+					final Throttle throttle = entry.getValue();
+					if (throttle.tryLock()) {
+						try {
+							if (throttle.lastUpdateTime() < lastClearTime) {
+								itr.remove();
+							}
+						} finally {
+							throttle.unlock();
+						}
+					}
+				}
+			}
+		}
+
+		// TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
+		FlowFile flowFile = session.get();
+		if (flowFile == null) {
+			return;
+		}
+
+		final ProcessorLog logger = getLogger();
+		final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+		final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+		long rateValue;
+		switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+			case DATA_RATE:
+				rateValue = flowFile.getSize();
+				break;
+			case FLOWFILE_RATE:
+				rateValue = 1;
+				break;
+			case ATTRIBUTE_RATE:
+				final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
+				if (attributeValue == null) {
+					logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
+					session.transfer(flowFile, REL_FAILURE);
+					return;
+				}
+
+				if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+					logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
+						new Object[]{flowFile, rateControlAttributeName, attributeValue});
+					session.transfer(flowFile, REL_FAILURE);
+					return;
+				}
+				rateValue = Long.parseLong(attributeValue);
+				break;
+			default:
+				throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+		}
+
+		final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+		final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+		Throttle throttle = throttleMap.get(groupName);
+		if (throttle == null) {
+			throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+
+			final String maxRateValue = context.getProperty(MAX_RATE).getValue();
+			final long newRate;
+			if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+				newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
+			} else {
+				newRate = Long.parseLong(maxRateValue);
+			}
+			throttle.setMaxRate(newRate);
+
+			throttleMap.put(groupName, throttle);
+		}
+
+		throttle.lock();
+		try {
+			if (throttle.tryAdd(rateValue)) {
+				logger.info("transferring {} to 'success'", new Object[]{flowFile});
+				session.transfer(flowFile, REL_SUCCESS);
+			} else {
+				flowFile = session.penalize(flowFile);
+				session.transfer(flowFile);
+			}
+		} finally {
+			throttle.unlock();
+		}
+	}
+
+	private static class TimestampedLong {
+
+		private final Long value;
+		private final long timestamp = System.currentTimeMillis();
+
+		public TimestampedLong(final Long value) {
+			this.value = value;
+		}
+
+		public Long getValue() {
+			return value;
+		}
+
+		public long getTimestamp() {
+			return timestamp;
+		}
+	}
+
+	private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
+
+		@Override
+		public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+			if (oldValue == null && toAdd == null) {
+				return new TimestampedLong(0L);
+			} else if (oldValue == null) {
+				return toAdd;
+			} else if (toAdd == null) {
+				return oldValue;
+			}
+
+			return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+		}
+
+		@Override
+		public TimestampedLong createNew() {
+			return new TimestampedLong(0L);
+		}
+
+		@Override
+		public long getTimestamp(TimestampedLong entity) {
+			return entity == null ? 0L : entity.getTimestamp();
+		}
+	}
+
+	private static class Throttle extends ReentrantLock {
+
+		private final AtomicLong maxRate = new AtomicLong(1L);
+		private final long timePeriodValue;
+		private final TimeUnit timePeriodUnit;
+		private final TimedBuffer<TimestampedLong> timedBuffer;
+		private final ProcessorLog logger;
+
+		private volatile long penalizationExpired;
+		private volatile long lastUpdateTime;
+
+		public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
+			this.timePeriodUnit = unit;
+			this.timePeriodValue = timePeriod;
+			this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
+			this.logger = logger;
+		}
+
+		public void setMaxRate(final long maxRate) {
+			this.maxRate.set(maxRate);
+		}
+
+		public long lastUpdateTime() {
+			return lastUpdateTime;
+		}
+
+		public boolean tryAdd(final long value) {
+			final long now = System.currentTimeMillis();
+			if (penalizationExpired > now) {
+				return false;
+			}
+
+			final long maxRateValue = maxRate.get();
+
+			final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
+			if (sum != null && sum.getValue() >= maxRateValue) {
+				logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
+				return false;
+			}
+
+			logger.debug("current sum for throttle is {}, so allowing rate of {} through",
+				new Object[]{sum == null ? 0 : sum.getValue(), value});
+
+			final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
+			if (transferred > maxRateValue) {
+				final long amountOver = transferred - maxRateValue;
+				// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
+				final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+				final double pct = (double) amountOver / (double) maxRateValue;
+				final long penalizationPeriod = (long) (milliDuration * pct);
+				this.penalizationExpired = now + penalizationPeriod;
+				logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
+			}
+
+			lastUpdateTime = now;
+			return true;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
index a0a1364..7a99a59 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
@@ -33,8 +33,10 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
@@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit;
  */
 @EventDriven
 @SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @SupportsBatching
 @Tags({"text", "convert", "characterset", "character set"})
 @CapabilityDescription("Converts a FlowFile's content from one character set to another")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 7eda593..9591960 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -34,10 +34,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -64,6 +66,7 @@ import org.codehaus.jackson.node.JsonNodeFactory;
 @SideEffectFree
 @SupportsBatching
 @SeeAlso(PutSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
 @CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
         + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "