You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/07 23:53:29 UTC
[09/17] nifi git commit: NIFI-810: Created RequiresInput annotation
and ensure that processors are invalid if connections do not agree
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index cbcc54d..385ac73 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -23,7 +23,8 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -57,6 +58,7 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
*
*/
@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "sequence file", "create", "sequencefile"})
@CapabilityDescription("Creates Hadoop Sequence Files from incoming flow files")
@SeeAlso(PutHDFS.class)
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 4a52fb7..aa03e73 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -44,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
+ "The file in HDFS is left intact without any changes being made to it.")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index de776d4..4c9deea 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
@WritesAttributes({
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 151cbf2..563bda8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -66,6 +68,7 @@ import org.codehaus.jackson.map.ObjectMapper;
@TriggerSerially
@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
+ "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 901159b..bedf1b9 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -59,6 +61,7 @@ import org.apache.nifi.util.StopWatch;
/**
* This processor copies FlowFiles to HDFS.
*/
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
index 574fb2d..3a6ac79 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,6 +59,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
@SideEffectFree
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"})
@CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. "
+ "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
index 53e7e69..26e8bb6 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -29,6 +29,8 @@ import java.util.Set;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -63,6 +65,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
@EventDriven
@SideEffectFree
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"HL7", "healthcare", "route", "Health Level 7"})
@DynamicProperties({
@DynamicProperty(name = "Name of a Relationship", value = "An HL7 Query Language query",
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
index 7fe6195..b44eccd 100644
--- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
+++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
@@ -16,11 +16,18 @@
*/
package org.apache.nifi.processors.image;
-import com.drew.imaging.ImageMetadataReader;
-import com.drew.imaging.ImageProcessingException;
-import com.drew.metadata.Directory;
-import com.drew.metadata.Metadata;
-import com.drew.metadata.Tag;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -30,25 +37,22 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.HashMap;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.Tag;
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Exif", "Exchangeable", "image", "file", "format", "JPG", "GIF", "PNG", "BMP", "metadata","IPTC", "XMP"})
@CapabilityDescription("Extract the image metadata from flowfiles containing images. This processor relies on this "
+ "metadata extractor library https://github.com/drewnoakes/metadata-extractor. It extracts a long list of "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
index c085b5f..176561f 100644
--- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
+++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
@@ -17,7 +17,27 @@
package org.apache.nifi.processors.image;
+import java.awt.Graphics2D;
+import java.awt.Image;
+import java.awt.image.BufferedImage;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReader;
+import javax.imageio.stream.ImageInputStream;
+
import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -33,25 +53,9 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
-import javax.imageio.ImageIO;
-import javax.imageio.ImageReader;
-import javax.imageio.stream.ImageInputStream;
-import java.awt.Image;
-import java.awt.Graphics2D;
-import java.awt.image.BufferedImage;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
@EventDriven
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "resize", "image", "jpg", "jpeg", "png", "bmp", "wbmp", "gif" })
@CapabilityDescription("Resizes an image to user-specified dimensions. This Processor uses the image codecs registered with the "
+ "environment that NiFi is running in. By default, this includes JPEG, PNG, BMP, WBMP, and GIF images.")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 26590df..e10977b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -32,18 +32,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -58,7 +53,15 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
@SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Fetches messages from Apache Kafka")
@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
@WritesAttributes({
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index d83c7bf..cff285c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -58,9 +56,13 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
import scala.actors.threadpool.Arrays;
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6c20a8f..6f126aa 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -18,18 +18,20 @@
*/
package org.apache.nifi.processors.kite;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -53,11 +55,13 @@ import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.CSVFileReader;
import org.kitesdk.data.spi.filesystem.CSVProperties;
-import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
@Tags({"kite", "csv", "avro"})
-@CapabilityDescription(
- "Converts CSV files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro extends AbstractKiteProcessor {
private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index ec1503c..af120bf 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -18,18 +18,18 @@
*/
package org.apache.nifi.processors.kite;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@@ -47,9 +47,13 @@ import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.JSONFileReader;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
@Tags({"kite", "json", "avro"})
-@CapabilityDescription(
- "Converts JSON files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
public class ConvertJSONToAvro extends AbstractKiteProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 7a30db1..1986f0b 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -18,16 +18,17 @@
*/
package org.apache.nifi.processors.kite;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@@ -46,6 +47,10 @@ import org.kitesdk.data.ValidationException;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.SchemaValidationUtil;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
@CapabilityDescription("Stores Avro records in a Kite dataset")
public class StoreInKiteDataset extends AbstractKiteProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
index 8398152..5f58781 100644
--- a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
+++ b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
@@ -33,6 +33,8 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -70,6 +72,7 @@ import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.core.util.MultivaluedMapImpl;
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"yandex", "translate", "translation", "language"})
@CapabilityDescription("Translates content and attributes from one language to another")
@WritesAttributes({
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index a78b112..e41b583 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -32,6 +32,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,8 +59,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.Location.Coordinate ;
import com.twitter.hbc.core.endpoint.Location ;
+import com.twitter.hbc.core.endpoint.Location.Coordinate ;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
@@ -69,6 +71,7 @@ import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
@SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription("Pulls status changes from Twitter's streaming API")
@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index ff264a1..a85aa0f 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -18,7 +18,29 @@
*/
package org.apache.nifi.processors.solr;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -41,27 +63,8 @@ import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
@Tags({"Apache", "Solr", "Get", "Pull"})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
public class GetSolr extends SolrProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 560ad34..df034c9 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -18,7 +18,24 @@
*/
package org.apache.nifi.processors.solr;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@@ -40,22 +57,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
@Tags({"Apache", "Solr", "Put", "Send"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 9887e38..816b407 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -51,101 +53,102 @@ import org.apache.nifi.util.StopWatch;
@SupportsBatching
@Tags({"encode", "base64"})
@CapabilityDescription("Encodes or decodes content to and from base64")
+@InputRequirement(Requirement.INPUT_REQUIRED)
public class Base64EncodeContent extends AbstractProcessor {
- public static final String ENCODE_MODE = "Encode";
- public static final String DECODE_MODE = "Decode";
+ public static final String ENCODE_MODE = "Encode";
+ public static final String DECODE_MODE = "Decode";
- public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
- .name("Mode")
- .description("Specifies whether the content should be encoded or decoded")
- .required(true)
- .allowableValues(ENCODE_MODE, DECODE_MODE)
- .defaultValue(ENCODE_MODE)
- .build();
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
- .build();
+ public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+ .name("Mode")
+ .description("Specifies whether the content should be encoded or decoded")
+ .required(true)
+ .allowableValues(ENCODE_MODE, DECODE_MODE)
+ .defaultValue(ENCODE_MODE)
+ .build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
+ .build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(MODE);
- this.properties = Collections.unmodifiableList(properties);
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(MODE);
+ this.properties = Collections.unmodifiableList(properties);
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
- final ProcessorLog logger = getLogger();
+ final ProcessorLog logger = getLogger();
- boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
- try {
- final StopWatch stopWatch = new StopWatch(true);
- if (encode) {
- flowFile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (Base64OutputStream bos = new Base64OutputStream(out)) {
- int len = -1;
- byte[] buf = new byte[8192];
- while ((len = in.read(buf)) > 0) {
- bos.write(buf, 0, len);
- }
- bos.flush();
- }
- }
- });
- } else {
- flowFile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
- int len = -1;
- byte[] buf = new byte[8192];
- while ((len = bis.read(buf)) > 0) {
- out.write(buf, 0, len);
- }
- out.flush();
- }
- }
- });
- }
+ boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ if (encode) {
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ try (Base64OutputStream bos = new Base64OutputStream(out)) {
+ int len = -1;
+ byte[] buf = new byte[8192];
+ while ((len = in.read(buf)) > 0) {
+ bos.write(buf, 0, len);
+ }
+ bos.flush();
+ }
+ }
+ });
+ } else {
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
+ int len = -1;
+ byte[] buf = new byte[8192];
+ while ((len = bis.read(buf)) > 0) {
+ out.write(buf, 0, len);
+ }
+ out.flush();
+ }
+ }
+ });
+ }
- logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
- session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
- } catch (ProcessException e) {
- logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- }
- }
+ logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
+ session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (ProcessException e) {
+ logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 1b9b20c..593cf44 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -29,20 +29,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import lzma.sdk.lzma.Decoder;
-import lzma.streams.LzmaInputStream;
-import lzma.streams.LzmaOutputStream;
-
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -63,9 +61,14 @@ import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
+import lzma.sdk.lzma.Decoder;
+import lzma.streams.LzmaInputStream;
+import lzma.streams.LzmaOutputStream;
+
@EventDriven
@SideEffectFree
@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 2efc852..a45c211 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -31,6 +31,12 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -43,10 +49,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.timebuffer.EntityAccess;
@@ -54,344 +56,345 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
@SideEffectFree
@TriggerSerially
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"rate control", "throttle", "rate", "throughput"})
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
public class ControlRate extends AbstractProcessor {
- public static final String DATA_RATE = "data rate";
- public static final String FLOWFILE_RATE = "flowfile count";
- public static final String ATTRIBUTE_RATE = "attribute value";
-
- public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
- .name("Rate Control Criteria")
- .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
- .required(true)
- .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
- .defaultValue(DATA_RATE)
- .build();
- public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
- .name("Maximum Rate")
- .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
- + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
- .build();
- public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
- .name("Rate Controlled Attribute")
- .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
- + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
- + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
- .build();
- public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
- .name("Time Duration")
- .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
- .required(true)
- .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
- .defaultValue("1 min")
- .build();
- public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
- .name("Grouping Attribute")
- .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
- + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles are transferred to this relationship")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
- .build();
-
- private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
- private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
- private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
- private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(RATE_CONTROL_CRITERIA);
- properties.add(MAX_RATE);
- properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
- properties.add(TIME_PERIOD);
- properties.add(GROUPING_ATTRIBUTE_NAME);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final ValidationContext context) {
- final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
-
- final Validator rateValidator;
- switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
- case DATA_RATE:
- rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
- break;
- case ATTRIBUTE_RATE:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
- final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
- if (rateAttr == null) {
- validationResults.add(new ValidationResult.Builder()
- .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
- .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
- .build());
- }
- break;
- case FLOWFILE_RATE:
- default:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
- break;
- }
-
- final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
- if (!rateResult.isValid()) {
- validationResults.add(rateResult);
- }
-
- return validationResults;
- }
-
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- super.onPropertyModified(descriptor, oldValue, newValue);
-
- if (descriptor.equals(RATE_CONTROL_CRITERIA)
- || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
- || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
- || descriptor.equals(TIME_PERIOD)) {
- // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
- throttleMap.clear();
- } else if (descriptor.equals(MAX_RATE)) {
- final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
- newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
- } else {
- newRate = Long.parseLong(newValue);
- }
-
- for (final Throttle throttle : throttleMap.values()) {
- throttle.setMaxRate(newRate);
- }
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final long lastClearTime = lastThrottleClearTime.get();
- final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
- if (lastClearTime < throttleExpirationMillis) {
- if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
- final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
- while (itr.hasNext()) {
- final Map.Entry<String, Throttle> entry = itr.next();
- final Throttle throttle = entry.getValue();
- if (throttle.tryLock()) {
- try {
- if (throttle.lastUpdateTime() < lastClearTime) {
- itr.remove();
- }
- } finally {
- throttle.unlock();
- }
- }
- }
- }
- }
-
- // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ProcessorLog logger = getLogger();
- final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
- final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
- long rateValue;
- switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
- case DATA_RATE:
- rateValue = flowFile.getSize();
- break;
- case FLOWFILE_RATE:
- rateValue = 1;
- break;
- case ATTRIBUTE_RATE:
- final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
- if (attributeValue == null) {
- logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
- logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
- new Object[]{flowFile, rateControlAttributeName, attributeValue});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- rateValue = Long.parseLong(attributeValue);
- break;
- default:
- throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
- }
-
- final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
- final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
- Throttle throttle = throttleMap.get(groupName);
- if (throttle == null) {
- throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
-
- final String maxRateValue = context.getProperty(MAX_RATE).getValue();
- final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
- newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
- } else {
- newRate = Long.parseLong(maxRateValue);
- }
- throttle.setMaxRate(newRate);
-
- throttleMap.put(groupName, throttle);
- }
-
- throttle.lock();
- try {
- if (throttle.tryAdd(rateValue)) {
- logger.info("transferring {} to 'success'", new Object[]{flowFile});
- session.transfer(flowFile, REL_SUCCESS);
- } else {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile);
- }
- } finally {
- throttle.unlock();
- }
- }
-
- private static class TimestampedLong {
-
- private final Long value;
- private final long timestamp = System.currentTimeMillis();
-
- public TimestampedLong(final Long value) {
- this.value = value;
- }
-
- public Long getValue() {
- return value;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
- }
-
- private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
-
- @Override
- public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
- if (oldValue == null && toAdd == null) {
- return new TimestampedLong(0L);
- } else if (oldValue == null) {
- return toAdd;
- } else if (toAdd == null) {
- return oldValue;
- }
-
- return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
- }
-
- @Override
- public TimestampedLong createNew() {
- return new TimestampedLong(0L);
- }
-
- @Override
- public long getTimestamp(TimestampedLong entity) {
- return entity == null ? 0L : entity.getTimestamp();
- }
- }
-
- private static class Throttle extends ReentrantLock {
-
- private final AtomicLong maxRate = new AtomicLong(1L);
- private final long timePeriodValue;
- private final TimeUnit timePeriodUnit;
- private final TimedBuffer<TimestampedLong> timedBuffer;
- private final ProcessorLog logger;
-
- private volatile long penalizationExpired;
- private volatile long lastUpdateTime;
-
- public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
- this.timePeriodUnit = unit;
- this.timePeriodValue = timePeriod;
- this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
- this.logger = logger;
- }
-
- public void setMaxRate(final long maxRate) {
- this.maxRate.set(maxRate);
- }
-
- public long lastUpdateTime() {
- return lastUpdateTime;
- }
-
- public boolean tryAdd(final long value) {
- final long now = System.currentTimeMillis();
- if (penalizationExpired > now) {
- return false;
- }
-
- final long maxRateValue = maxRate.get();
-
- final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
- if (sum != null && sum.getValue() >= maxRateValue) {
- logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
- return false;
- }
-
- logger.debug("current sum for throttle is {}, so allowing rate of {} through",
- new Object[]{sum == null ? 0 : sum.getValue(), value});
-
- final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
- if (transferred > maxRateValue) {
- final long amountOver = transferred - maxRateValue;
- // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
- final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
- final double pct = (double) amountOver / (double) maxRateValue;
- final long penalizationPeriod = (long) (milliDuration * pct);
- this.penalizationExpired = now + penalizationPeriod;
- logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
- }
-
- lastUpdateTime = now;
- return true;
- }
- }
+ public static final String DATA_RATE = "data rate";
+ public static final String FLOWFILE_RATE = "flowfile count";
+ public static final String ATTRIBUTE_RATE = "attribute value";
+
+ public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
+ .name("Rate Control Criteria")
+ .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
+ .required(true)
+ .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+ .defaultValue(DATA_RATE)
+ .build();
+ public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
+ .name("Maximum Rate")
+ .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+ .build();
+ public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("Rate Controlled Attribute")
+ .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+ + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+ + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
+ .name("Time Duration")
+ .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
+ .required(true)
+ .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+ .defaultValue("1 min")
+ .build();
+ public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("Grouping Attribute")
+ .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+ + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles are transferred to this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
+ .build();
+
+ private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
+ private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+ private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RATE_CONTROL_CRITERIA);
+ properties.add(MAX_RATE);
+ properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+ properties.add(TIME_PERIOD);
+ properties.add(GROUPING_ATTRIBUTE_NAME);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+ final Validator rateValidator;
+ switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+ case DATA_RATE:
+ rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+ break;
+ case ATTRIBUTE_RATE:
+ rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+ final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+ if (rateAttr == null) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+ .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+ .build());
+ }
+ break;
+ case FLOWFILE_RATE:
+ default:
+ rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+ break;
+ }
+
+ final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
+ if (!rateResult.isValid()) {
+ validationResults.add(rateResult);
+ }
+
+ return validationResults;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+
+ if (descriptor.equals(RATE_CONTROL_CRITERIA)
+ || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+ || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+ || descriptor.equals(TIME_PERIOD)) {
+ // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
+ throttleMap.clear();
+ } else if (descriptor.equals(MAX_RATE)) {
+ final long newRate;
+ if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+ newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+ } else {
+ newRate = Long.parseLong(newValue);
+ }
+
+ for (final Throttle throttle : throttleMap.values()) {
+ throttle.setMaxRate(newRate);
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final long lastClearTime = lastThrottleClearTime.get();
+ final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ if (lastClearTime < throttleExpirationMillis) {
+ if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+ final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+ while (itr.hasNext()) {
+ final Map.Entry<String, Throttle> entry = itr.next();
+ final Throttle throttle = entry.getValue();
+ if (throttle.tryLock()) {
+ try {
+ if (throttle.lastUpdateTime() < lastClearTime) {
+ itr.remove();
+ }
+ } finally {
+ throttle.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ProcessorLog logger = getLogger();
+ final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+ final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+ long rateValue;
+ switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+ case DATA_RATE:
+ rateValue = flowFile.getSize();
+ break;
+ case FLOWFILE_RATE:
+ rateValue = 1;
+ break;
+ case ATTRIBUTE_RATE:
+ final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
+ if (attributeValue == null) {
+ logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+ logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
+ new Object[]{flowFile, rateControlAttributeName, attributeValue});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ rateValue = Long.parseLong(attributeValue);
+ break;
+ default:
+ throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+ }
+
+ final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+ final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+ Throttle throttle = throttleMap.get(groupName);
+ if (throttle == null) {
+ throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+
+ final String maxRateValue = context.getProperty(MAX_RATE).getValue();
+ final long newRate;
+ if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+ newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
+ } else {
+ newRate = Long.parseLong(maxRateValue);
+ }
+ throttle.setMaxRate(newRate);
+
+ throttleMap.put(groupName, throttle);
+ }
+
+ throttle.lock();
+ try {
+ if (throttle.tryAdd(rateValue)) {
+ logger.info("transferring {} to 'success'", new Object[]{flowFile});
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile);
+ }
+ } finally {
+ throttle.unlock();
+ }
+ }
+
+ private static class TimestampedLong {
+
+ private final Long value;
+ private final long timestamp = System.currentTimeMillis();
+
+ public TimestampedLong(final Long value) {
+ this.value = value;
+ }
+
+ public Long getValue() {
+ return value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
+
+ @Override
+ public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+ if (oldValue == null && toAdd == null) {
+ return new TimestampedLong(0L);
+ } else if (oldValue == null) {
+ return toAdd;
+ } else if (toAdd == null) {
+ return oldValue;
+ }
+
+ return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+ }
+
+ @Override
+ public TimestampedLong createNew() {
+ return new TimestampedLong(0L);
+ }
+
+ @Override
+ public long getTimestamp(TimestampedLong entity) {
+ return entity == null ? 0L : entity.getTimestamp();
+ }
+ }
+
+ private static class Throttle extends ReentrantLock {
+
+ private final AtomicLong maxRate = new AtomicLong(1L);
+ private final long timePeriodValue;
+ private final TimeUnit timePeriodUnit;
+ private final TimedBuffer<TimestampedLong> timedBuffer;
+ private final ProcessorLog logger;
+
+ private volatile long penalizationExpired;
+ private volatile long lastUpdateTime;
+
+ public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
+ this.timePeriodUnit = unit;
+ this.timePeriodValue = timePeriod;
+ this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
+ this.logger = logger;
+ }
+
+ public void setMaxRate(final long maxRate) {
+ this.maxRate.set(maxRate);
+ }
+
+ public long lastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public boolean tryAdd(final long value) {
+ final long now = System.currentTimeMillis();
+ if (penalizationExpired > now) {
+ return false;
+ }
+
+ final long maxRateValue = maxRate.get();
+
+ final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
+ if (sum != null && sum.getValue() >= maxRateValue) {
+ logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
+ return false;
+ }
+
+ logger.debug("current sum for throttle is {}, so allowing rate of {} through",
+ new Object[]{sum == null ? 0 : sum.getValue(), value});
+
+ final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
+ if (transferred > maxRateValue) {
+ final long amountOver = transferred - maxRateValue;
+ // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
+ final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+ final double pct = (double) amountOver / (double) maxRateValue;
+ final long penalizationPeriod = (long) (milliDuration * pct);
+ this.penalizationExpired = now + penalizationPeriod;
+ logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
+ }
+
+ lastUpdateTime = now;
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
index a0a1364..7a99a59 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
@@ -33,8 +33,10 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
@@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit;
*/
@EventDriven
@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"text", "convert", "characterset", "character set"})
@CapabilityDescription("Converts a FlowFile's content from one character set to another")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 7eda593..9591960 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -34,10 +34,12 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -64,6 +66,7 @@ import org.codehaus.jackson.node.JsonNodeFactory;
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "