You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/07 23:53:31 UTC
[11/17] nifi git commit: NIFI-810: Created RequiresInput annotation
and ensure that processors are invalid if connections do not agree
NIFI-810: Created RequiresInput annotation and ensure that processors are invalid if connections do not agree
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4afd8f88
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4afd8f88
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4afd8f88
Branch: refs/heads/NIFI-810-InputRequirement
Commit: 4afd8f88f8a34cf87f2a06221667166a54c99a15
Parents: 31fba6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Sep 25 11:39:28 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 7 17:26:14 2015 -0400
----------------------------------------------------------------------
.../annotation/behavior/InputRequirement.java | 51 +
.../src/main/asciidoc/developer-guide.adoc | 11 +
.../nifi/processors/avro/ConvertAvroToJSON.java | 3 +
.../processors/avro/ExtractAvroMetadata.java | 29 +-
.../apache/nifi/processors/avro/SplitAvro.java | 27 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 3 +
.../nifi/processors/aws/s3/PutS3Object.java | 6 +-
.../apache/nifi/processors/aws/sns/PutSNS.java | 3 +
.../nifi/processors/aws/sqs/DeleteSQS.java | 3 +
.../apache/nifi/processors/aws/sqs/GetSQS.java | 5 +-
.../apache/nifi/processors/aws/sqs/PutSQS.java | 5 +-
.../nifi/processors/flume/ExecuteFlumeSink.java | 14 +-
.../processors/flume/ExecuteFlumeSource.java | 14 +-
.../apache/nifi/controller/ProcessorNode.java | 89 +-
.../nifi/controller/StandardProcessorNode.java | 2440 +++++++++---------
.../org/apache/nifi/processors/GeoEnrichIP.java | 3 +
.../hadoop/CreateHadoopSequenceFile.java | 4 +-
.../nifi/processors/hadoop/FetchHDFS.java | 3 +
.../apache/nifi/processors/hadoop/GetHDFS.java | 3 +
.../apache/nifi/processors/hadoop/ListHDFS.java | 3 +
.../apache/nifi/processors/hadoop/PutHDFS.java | 3 +
.../processors/hl7/ExtractHL7Attributes.java | 3 +
.../apache/nifi/processors/hl7/RouteHL7.java | 3 +
.../processors/image/ExtractImageMetadata.java | 36 +-
.../nifi/processors/image/ResizeImage.java | 38 +-
.../apache/nifi/processors/kafka/GetKafka.java | 21 +-
.../apache/nifi/processors/kafka/PutKafka.java | 10 +-
.../nifi/processors/kite/ConvertCSVToAvro.java | 16 +-
.../nifi/processors/kite/ConvertJSONToAvro.java | 14 +-
.../processors/kite/StoreInKiteDataset.java | 9 +-
.../nifi/processors/yandex/YandexTranslate.java | 3 +
.../nifi-pcap-processors/.gitignore | 1 +
.../nifi/processors/twitter/GetTwitter.java | 5 +-
.../apache/nifi/processors/solr/GetSolr.java | 43 +-
.../processors/solr/PutSolrContentStream.java | 33 +-
.../standard/Base64EncodeContent.java | 171 +-
.../processors/standard/CompressContent.java | 15 +-
.../nifi/processors/standard/ControlRate.java | 683 ++---
.../standard/ConvertCharacterSet.java | 3 +
.../processors/standard/ConvertJSONToSQL.java | 3 +
.../processors/standard/DetectDuplicate.java | 3 +
.../processors/standard/DistributeLoad.java | 3 +
.../processors/standard/DuplicateFlowFile.java | 3 +
.../nifi/processors/standard/EncodeContent.java | 15 +-
.../processors/standard/EncryptContent.java | 3 +
.../processors/standard/EvaluateJsonPath.java | 38 +-
.../nifi/processors/standard/EvaluateXPath.java | 29 +-
.../processors/standard/EvaluateXQuery.java | 25 +-
.../processors/standard/ExecuteProcess.java | 3 +
.../nifi/processors/standard/ExecuteSQL.java | 3 +
.../standard/ExecuteStreamCommand.java | 7 +-
.../nifi/processors/standard/ExtractText.java | 3 +
.../processors/standard/GenerateFlowFile.java | 11 +-
.../apache/nifi/processors/standard/GetFTP.java | 13 +-
.../nifi/processors/standard/GetFile.java | 7 +-
.../nifi/processors/standard/GetHTTP.java | 3 +
.../nifi/processors/standard/GetJMSQueue.java | 3 +
.../nifi/processors/standard/GetJMSTopic.java | 3 +
.../nifi/processors/standard/GetSFTP.java | 7 +-
.../processors/standard/HandleHttpRequest.java | 7 +-
.../processors/standard/HandleHttpResponse.java | 5 +-
.../nifi/processors/standard/HashAttribute.java | 5 +-
.../nifi/processors/standard/HashContent.java | 5 +-
.../processors/standard/IdentifyMimeType.java | 5 +-
.../nifi/processors/standard/InvokeHTTP.java | 3 +
.../nifi/processors/standard/ListenHTTP.java | 16 +-
.../nifi/processors/standard/ListenUDP.java | 18 +-
.../nifi/processors/standard/LogAttribute.java | 16 +-
.../nifi/processors/standard/MergeContent.java | 11 +-
.../nifi/processors/standard/ModifyBytes.java | 14 +-
.../processors/standard/MonitorActivity.java | 31 +-
.../nifi/processors/standard/PostHTTP.java | 3 +
.../nifi/processors/standard/PutEmail.java | 3 +
.../apache/nifi/processors/standard/PutFTP.java | 3 +
.../nifi/processors/standard/PutFile.java | 3 +
.../apache/nifi/processors/standard/PutJMS.java | 5 +-
.../nifi/processors/standard/PutSFTP.java | 3 +
.../apache/nifi/processors/standard/PutSQL.java | 3 +
.../nifi/processors/standard/ReplaceText.java | 54 +-
.../standard/ReplaceTextWithMapping.java | 18 +-
.../processors/standard/RouteOnAttribute.java | 3 +
.../processors/standard/RouteOnContent.java | 19 +-
.../nifi/processors/standard/ScanAttribute.java | 19 +-
.../nifi/processors/standard/ScanContent.java | 5 +-
.../processors/standard/SegmentContent.java | 7 +-
.../nifi/processors/standard/SplitContent.java | 7 +-
.../nifi/processors/standard/SplitJson.java | 32 +-
.../nifi/processors/standard/SplitText.java | 53 +-
.../nifi/processors/standard/SplitXml.java | 18 +-
.../nifi/processors/standard/TransformXml.java | 3 +
.../nifi/processors/standard/UnpackContent.java | 9 +-
.../nifi/processors/standard/ValidateXml.java | 16 +-
.../processors/attributes/UpdateAttribute.java | 9 +-
93 files changed, 2418 insertions(+), 2027 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
new file mode 100644
index 0000000..97e6b88
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
@@ -0,0 +1,51 @@
+package org.apache.nifi.annotation.behavior;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * <p>
+ * Marker interface that a Processor can use to indicate whether it accepts, requires, or forbids
+ * input from other Processors. This information is used by the framework in order to ensure that
+ * a Processor is marked as invalid if it is missing necessary input or has input that will be ignored.
+ * This information also is used by the NiFi UI in order to prevent users from making connections
+ * to Processors that don't make sense.
+ * </p>
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface InputRequirement {
+ Requirement value();
+
+ public static enum Requirement {
+ /**
+ * This value is used to indicate that the Processor requires input from other Processors
+ * in order to run. As a result, the Processor will not be valid if it does not have any
+ * incoming connections.
+ */
+ INPUT_REQUIRED,
+
+ /**
+ * This value is used to indicate that the Processor will consume data from an incoming
+ * connection but does not require an incoming connection in order to perform its task.
+ * If the {@link InputRequirement} annotation is not present, this is the default value
+ * that is used.
+ */
+ INPUT_ALLOWED,
+
+ /**
+ * This value is used to indicate that the Processor is a "Source Processor" and does
+ * not accept incoming connections. Because the Processor does not pull FlowFiles from
+ * an incoming connection, it can be very confusing for users who create incoming connections
+ * to the Processor. As a result, this value can be used in order to clarify that incoming
+ * connections will not be used. This prevents the user from even creating such a connection.
+ */
+ INPUT_FORBIDDEN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index f9950d5..28df5c2 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -1633,6 +1633,17 @@ will handle your Processor:
not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run
periodically to time out a network connection.
+ - `InputRequirement`: By default, all Processors will allow users to create incoming connections for the Processor, but
+ if the user does not create an incoming connection, the Processor is still valid and can be scheduled to run. For Processors
+ that are expected to be used as a "Source Processor," though, this can be confusing to the user, and the user may attempt to
+ send FlowFiles to that Processor, only for the FlowFiles to queue up without being processed. Conversely, if the Processor
+ expects incoming FlowFiles but does not have an input queue, the Processor will be scheduled to run but will perform no work,
+ as it will receive no FlowFile, and this leads to confusion as well. As a result, we can use the `@InputRequirement` annotation
+ and provide it a value of `INPUT_REQUIRED`, `INPUT_ALLOWED`, or `INPUT_FORBIDDEN`. This provides information to the framework
+ about when the Processor should be made invalid, or whether or not the user should even be able to draw a Connection to the
+ Processor. For instance, if a Processor is annotated with `InputRequirement(Requirement.INPUT_FORBIDDEN)`, then the user will
+ not even be able to create a Connection with that Processor as the destination.
+
=== Data Buffering
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8832a73..b214427 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -29,6 +29,8 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,6 +48,7 @@ import org.apache.nifi.processor.io.StreamCallback;
@SideEffectFree
@SupportsBatching
@Tags({ "json", "avro", "binary" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
index 48aad7d..4cf5289 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
@@ -16,6 +16,19 @@
*/
package org.apache.nifi.processors.avro;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.file.DataFileStream;
@@ -23,6 +36,8 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -41,22 +56,10 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "schema", "metadata" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Extracts metadata from the header of an Avro datafile.")
@WritesAttributes({
@WritesAttribute(attribute = "schema.type", description = "The type of the schema (i.e. record, enum, etc.)."),
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index 3b344b5..dbf5778 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -16,6 +16,18 @@
*/
package org.apache.nifi.processors.avro;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
@@ -26,6 +38,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -45,21 +59,10 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "split" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " +
"the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
public class SplitAvro extends AbstractProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 2406b67..131e671 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -45,6 +47,7 @@ import com.amazonaws.services.s3.model.S3Object;
@SupportsBatching
@SeeAlso({PutS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
@WritesAttributes({
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 24c82dd..7398c4e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -54,6 +56,7 @@ import com.amazonaws.services.s3.model.StorageClass;
@SupportsBatching
@SeeAlso({FetchS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
@@ -101,7 +104,8 @@ public class PutS3Object extends AbstractS3Processor {
.build();
}
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 7d42703..e571ff4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -41,6 +43,7 @@ import com.amazonaws.services.sns.model.PublishRequest;
@SupportsBatching
@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
public class PutSNS extends AbstractSNSProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 65e020d..f88aa71 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -37,6 +39,7 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
@SupportsBatching
@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
public class DeleteSQS extends AbstractSQSProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 7c2dd2d..a140999 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -51,8 +53,9 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@SupportsBatching
+@SeeAlso({ PutSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
-@SeeAlso({PutSQS.class, DeleteSQS.class})
@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
@WritesAttributes({
@WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index 3961f32..0af508e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -28,6 +28,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -45,8 +47,9 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
@SupportsBatching
+@SeeAlso({ GetSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
-@SeeAlso({GetSQS.class, DeleteSQS.class})
@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
@DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute",
description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
index 57e0278..f93b215 100644
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
+++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
@@ -16,20 +16,19 @@
*/
package org.apache.nifi.processors.flume;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
+
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurables;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
@@ -40,12 +39,17 @@ import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
/**
* This processor runs a Flume sink
*/
+@TriggerSerially
@Tags({"flume", "hadoop", "put", "sink"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.")
-@TriggerSerially
public class ExecuteFlumeSink extends AbstractFlumeProcessor {
public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
index 600f4b1..3aad6b7 100644
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
+++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
@@ -16,12 +16,10 @@
*/
package org.apache.nifi.processors.flume;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.PollableSource;
@@ -29,12 +27,13 @@ import org.apache.flume.Source;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
@@ -46,12 +45,17 @@ import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
/**
* This processor runs a Flume source
*/
+@TriggerSerially
@Tags({"flume", "hadoop", "get", "source"})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile")
-@TriggerSerially
public class ExecuteFlumeSource extends AbstractFlumeProcessor {
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4afd8f88/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index f2a83d0..2f72d0f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -30,70 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
- public ProcessorNode(final Processor processor, final String id,
- final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
- super(processor, id, validationContextFactory, serviceProvider);
- }
+ public ProcessorNode(final Processor processor, final String id,
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+ super(processor, id, validationContextFactory, serviceProvider);
+ }
- public abstract boolean isIsolated();
+ public abstract boolean isIsolated();
- public abstract boolean isTriggerWhenAnyDestinationAvailable();
+ public abstract boolean isTriggerWhenAnyDestinationAvailable();
- @Override
- public abstract boolean isSideEffectFree();
+ @Override
+ public abstract boolean isSideEffectFree();
- public abstract boolean isTriggeredSerially();
+ public abstract boolean isTriggeredSerially();
- public abstract boolean isEventDrivenSupported();
+ public abstract boolean isEventDrivenSupported();
- public abstract boolean isHighThroughputSupported();
+ public abstract boolean isHighThroughputSupported();
- @Override
- public abstract boolean isValid();
+ public abstract Requirement getInputRequirement();
- public abstract void setScheduledState(ScheduledState scheduledState);
+ @Override
+ public abstract boolean isValid();
- public abstract void setBulletinLevel(LogLevel bulletinLevel);
+ public abstract void setScheduledState(ScheduledState scheduledState);
- public abstract LogLevel getBulletinLevel();
+ public abstract void setBulletinLevel(LogLevel bulletinLevel);
- public abstract Processor getProcessor();
+ public abstract LogLevel getBulletinLevel();
- public abstract void yield(long period, TimeUnit timeUnit);
+ public abstract Processor getProcessor();
- public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
+ public abstract void yield(long period, TimeUnit timeUnit);
- public abstract Set<Relationship> getAutoTerminatedRelationships();
+ public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
- public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+ public abstract Set<Relationship> getAutoTerminatedRelationships();
- @Override
- public abstract SchedulingStrategy getSchedulingStrategy();
+ public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
- public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+ @Override
+ public abstract SchedulingStrategy getSchedulingStrategy();
- public abstract long getRunDuration(TimeUnit timeUnit);
+ public abstract void setRunDuration(long duration, TimeUnit timeUnit);
- public abstract Map<String, String> getStyle();
+ public abstract long getRunDuration(TimeUnit timeUnit);
- public abstract void setStyle(Map<String, String> style);
+ public abstract Map<String, String> getStyle();
- /**
- * @return the number of threads (concurrent tasks) currently being used by
- * this Processor
- */
- public abstract int getActiveThreadCount();
+ public abstract void setStyle(Map<String, String> style);
- /**
- * Verifies that this Processor can be started if the provided set of
- * services are enabled. This is introduced because we need to verify that
- * all components can be started before starting any of them. In order to do
- * that, we need to know that this component can be started if the given
- * services are enabled, as we will then enable the given services before
- * starting this component.
- *
- * @param ignoredReferences to ignore
- */
- public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+ /**
+ * @return the number of threads (concurrent tasks) currently being used by
+ * this Processor
+ */
+ public abstract int getActiveThreadCount();
+
+ /**
+ * Verifies that this Processor can be started if the provided set of
+ * services are enabled. This is introduced because we need to verify that
+ * all components can be started before starting any of them. In order to do
+ * that, we need to know that this component can be started if the given
+ * services are enabled, as we will then enable the given services before
+ * starting this component.
+ *
+ * @param ignoredReferences to ignore
+ */
+ public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}