You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:35 UTC
[45/49] incubator-nifi git commit: NIFI-537 fixed identified
licensing issue with several of the new nars
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 9a4fc5b..24c82dd 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -56,35 +56,35 @@ import com.amazonaws.services.s3.model.StorageClass;
@SeeAlso({FetchS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name="The name of a User-Defined Metadata field to add to the S3 Object",
- value="The value of a User-Defined Metadata field to add to the S3 Object",
- description="Allows user-defined metadata to be added to the S3 object as key/value pairs",
- supportsExpressionLanguage=true)
-@ReadsAttribute(attribute="filename", description="Uses the FlowFile's filename as the filename for the S3 object")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
+ value = "The value of a User-Defined Metadata field to add to the S3 Object",
+ description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
+ supportsExpressionLanguage = true)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
- @WritesAttribute(attribute="s3.version", description="The version of the S3 Object that was put to S3"),
- @WritesAttribute(attribute="s3.etag", description="The ETag of the S3 Object"),
- @WritesAttribute(attribute="s3.expiration", description="A human-readable form of the expiration date of the S3 object, if one is set")
+ @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
+ @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
+ @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
})
public class PutS3Object extends AbstractS3Processor {
+
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
- .name("Expiration Time Rule")
- .required(false)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Expiration Time Rule")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
- .name("Storage Class")
- .required(true)
- .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
- .defaultValue(StorageClass.Standard.name())
- .build();
+ .name("Storage Class")
+ .required(true)
+ .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
+ .defaultValue(StorageClass.Standard.name())
+ .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
- FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER) );
-
+ Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+ FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -94,21 +94,21 @@ public class PutS3Object extends AbstractS3Processor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .dynamic(true)
- .build();
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
}
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
- if ( flowFile == null ) {
+ if (flowFile == null) {
return;
}
-
+
final long startNanos = System.nanoTime();
-
+
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
@@ -123,59 +123,59 @@ public class PutS3Object extends AbstractS3Processor {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
objectMetadata.setContentLength(ff.getSize());
-
+
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
- if ( expirationRule != null ) {
+ if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
-
+
final Map<String, String> userMetadata = new HashMap<>();
- for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
- if ( entry.getKey().isDynamic() ) {
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
-
- if ( !userMetadata.isEmpty() ) {
+
+ if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
-
+
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
- if ( acl != null ) {
+ if (acl != null) {
request.setAccessControlList(acl);
}
-
+
final PutObjectResult result = s3.putObject(request);
- if ( result.getVersionId() != null ) {
+ if (result.getVersionId() != null) {
attributes.put("s3.version", result.getVersionId());
}
-
+
attributes.put("s3.etag", result.getETag());
-
+
final Date expiration = result.getExpirationTime();
- if ( expiration != null ) {
+ if (expiration != null) {
attributes.put("s3.expiration", expiration.toString());
}
}
}
});
-
- if ( !attributes.isEmpty() ) {
+
+ if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
-
+
final String url = "http://" + bucket + ".s3.amazonaws.com/" + key;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
-
- getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
+
+ getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
- getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
+ getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
index 5447169..5b57647 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -28,30 +28,28 @@ import com.amazonaws.services.sns.AmazonSNSClient;
public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
- protected static final AllowableValue ARN_TYPE_TOPIC =
- new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
- protected static final AllowableValue ARN_TYPE_TARGET =
- new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
-
+ protected static final AllowableValue ARN_TYPE_TOPIC
+ = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
+ protected static final AllowableValue ARN_TYPE_TARGET
+ = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
+
public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
- .name("Amazon Resource Name (ARN)")
- .description("The name of the resource to which notifications should be published")
- .expressionLanguageSupported(true)
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
+ .name("Amazon Resource Name (ARN)")
+ .description("The name of the resource to which notifications should be published")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
- .name("ARN Type")
- .description("The type of Amazon Resource Name that is being used.")
- .expressionLanguageSupported(false)
- .required(true)
- .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
- .defaultValue(ARN_TYPE_TOPIC.getValue())
- .build();
-
-
-
+ .name("ARN Type")
+ .description("The type of Amazon Resource Name that is being used.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
+ .defaultValue(ARN_TYPE_TOPIC.getValue())
+ .build();
+
@Override
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonSNSClient(credentials, config);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 1de3251..b1a604f 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -46,34 +46,34 @@ import com.amazonaws.services.sns.model.PublishRequest;
public class PutSNS extends AbstractSNSProcessor {
public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
- .name("Character Set")
- .description("The character set in which the FlowFile's content is encoded")
- .defaultValue("UTF-8")
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .required(true)
- .build();
+ .name("Character Set")
+ .description("The character set in which the FlowFile's content is encoded")
+ .defaultValue("UTF-8")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .required(true)
+ .build();
public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
- .name("Use JSON Structure")
- .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.")
- .defaultValue("false")
- .allowableValues("true", "false")
- .required(true)
- .build();
+ .name("Use JSON Structure")
+ .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.")
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .required(true)
+ .build();
public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
- .name("E-mail Subject")
- .description("The optional subject to use for any subscribers that are subscribed via E-mail")
- .expressionLanguageSupported(true)
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
+ .name("E-mail Subject")
+ .description("The optional subject to use for any subscribers that are subscribed via E-mail")
+ .expressionLanguageSupported(true)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
- USE_JSON_STRUCTURE, CHARACTER_ENCODING) );
+ Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
+ USE_JSON_STRUCTURE, CHARACTER_ENCODING));
public static final int MAX_SIZE = 256 * 1024;
-
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
@@ -82,71 +82,70 @@ public class PutSNS extends AbstractSNSProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .required(false)
- .dynamic(true)
- .build();
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .dynamic(true)
+ .build();
}
-
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
- if ( flowFile == null ) {
+ if (flowFile == null) {
return;
}
- if ( flowFile.getSize() > MAX_SIZE ) {
- getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[] {flowFile});
+ if (flowFile.getSize() > MAX_SIZE) {
+ getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
-
+
final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
-
+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String message = new String(baos.toByteArray(), charset);
-
+
final AmazonSNSClient client = getClient();
final PublishRequest request = new PublishRequest();
request.setMessage(message);
-
- if ( context.getProperty(USE_JSON_STRUCTURE).asBoolean() ) {
+
+ if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
request.setMessageStructure("json");
}
-
+
final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
final String arnType = context.getProperty(ARN_TYPE).getValue();
- if ( arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue()) ) {
+ if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
request.setTopicArn(arn);
} else {
request.setTargetArn(arn);
}
-
+
final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
- if ( subject != null ) {
+ if (subject != null) {
request.setSubject(subject);
}
- for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
- if ( entry.getKey().isDynamic() && !isEmpty(entry.getValue()) ) {
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
final MessageAttributeValue value = new MessageAttributeValue();
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
value.setDataType("String");
request.addMessageAttributesEntry(entry.getKey().getName(), value);
}
}
-
+
try {
client.publish(request);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, arn);
- getLogger().info("Successfully published notification for {}", new Object[] {flowFile});
+ getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
} catch (final Exception e) {
- getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[] {flowFile, e});
+ getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index 2ef749f..3cee02d 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -28,20 +28,20 @@ import com.amazonaws.services.sqs.AmazonSQSClient;
public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of messages to send in a single network request")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("25")
- .build();
+ .name("Batch Size")
+ .description("The maximum number of messages to send in a single network request")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("25")
+ .build();
public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
- .name("Queue URL")
- .description("The URL of the queue to act upon")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .required(true)
- .build();
+ .name("Queue URL")
+ .description("The URL of the queue to act upon")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
@Override
protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 2416044..65e020d 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -40,54 +40,54 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
public class DeleteSQS extends AbstractSQSProcessor {
+
public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
- .name("Receipt Handle")
- .description("The identifier that specifies the receipt of the message")
- .expressionLanguageSupported(true)
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue("${sqs.receipt.handle}")
- .build();
-
+ .name("Receipt Handle")
+ .description("The identifier that specifies the receipt of the message")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("${sqs.receipt.handle}")
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT) );
+ Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(1);
- if ( flowFiles.isEmpty() ) {
+ if (flowFiles.isEmpty()) {
return;
}
-
+
final FlowFile firstFlowFile = flowFiles.get(0);
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
-
+
final AmazonSQSClient client = getClient();
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
request.setQueueUrl(queueUrl);
-
+
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
-
- for ( final FlowFile flowFile : flowFiles ) {
+
+ for (final FlowFile flowFile : flowFiles) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
entries.add(entry);
}
-
+
request.setEntries(entries);
-
+
try {
client.deleteMessageBatch(request);
- getLogger().info("Successfully deleted {} objects from SQS", new Object[] {flowFiles.size()});
+ getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
session.transfer(flowFiles, REL_SUCCESS);
} catch (final Exception e) {
- getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[] {flowFiles.size(), e});
+ getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
session.transfer(flowFiles, REL_FAILURE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 6c0257b..929a437 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -51,116 +51,116 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@SupportsBatching
-@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
@SeeAlso({PutSQS.class, DeleteSQS.class})
@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
@WritesAttributes({
- @WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"),
- @WritesAttribute(attribute="hash.algorithm", description="MD5"),
- @WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"),
- @WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue")
+ @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
+ @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+ @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
+ @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
})
public class GetSQS extends AbstractSQSProcessor {
+
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("Character Set")
- .description("The Character Set that should be used to encode the textual content of the SQS message")
- .required(true)
- .defaultValue("UTF-8")
- .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
- .build();
-
+ .name("Character Set")
+ .description("The Character Set that should be used to encode the textual content of the SQS message")
+ .required(true)
+ .defaultValue("UTF-8")
+ .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
+ .build();
+
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
- .name("Auto Delete Messages")
- .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
-
+ .name("Auto Delete Messages")
+ .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Visibility Timeout")
- .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
- .expressionLanguageSupported(false)
- .required(true)
- .defaultValue("15 mins")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
+ .name("Visibility Timeout")
+ .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .defaultValue("15 mins")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of messages to send in a single network request")
- .required(true)
- .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
- .defaultValue("10")
- .build();
-
-
+ .name("Batch Size")
+ .description("The maximum number of messages to send in a single network request")
+ .required(true)
+ .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
+ .defaultValue("10")
+ .build();
+
public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(QUEUE_URL)
- .expressionLanguageSupported(false)
- .build();
-
+ .fromPropertyDescriptor(QUEUE_URL)
+ .expressionLanguageSupported(false)
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT) );
+ Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
-
+
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
-
+
final AmazonSQSClient client = getClient();
-
+
final ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setAttributeNames(Collections.singleton("All"));
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
request.setQueueUrl(queueUrl);
-
+
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
-
+
final ReceiveMessageResult result;
try {
result = client.receiveMessage(request);
} catch (final Exception e) {
- getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[] {e});
+ getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
context.yield();
return;
}
-
+
final List<Message> messages = result.getMessages();
- if ( messages.isEmpty() ) {
+ if (messages.isEmpty()) {
context.yield();
return;
}
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
-
- for ( final Message message : messages ) {
+
+ for (final Message message : messages) {
FlowFile flowFile = session.create();
-
+
final Map<String, String> attributes = new HashMap<>();
- for ( final Map.Entry<String, String> entry : message.getAttributes().entrySet() ) {
+ for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue());
}
-
- for ( final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet() ) {
+
+ for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
}
-
+
attributes.put("hash.value", message.getMD5OfBody());
attributes.put("hash.algorithm", "md5");
attributes.put("sqs.message.id", message.getMessageId());
attributes.put("sqs.receipt.handle", message.getReceiptHandle());
-
+
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
@@ -168,37 +168,37 @@ public class GetSQS extends AbstractSQSProcessor {
out.write(message.getBody().getBytes(charset));
}
});
-
+
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, queueUrl);
-
- getLogger().info("Successfully received {} from Amazon SQS", new Object[] {flowFile});
+
+ getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
}
-
- if ( autoDelete ) {
+
+ if (autoDelete) {
// If we want to auto-delete messages, we must fist commit the session to ensure that the data
// is persisted in NiFi's repositories.
session.commit();
-
+
final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
deleteRequest.setQueueUrl(queueUrl);
final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
- for ( final Message message : messages ) {
+ for (final Message message : messages) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setId(message.getMessageId());
entry.setReceiptHandle(message.getReceiptHandle());
deleteRequestEntries.add(entry);
}
-
+
deleteRequest.setEntries(deleteRequestEntries);
-
+
try {
client.deleteMessageBatch(deleteRequest);
} catch (final Exception e) {
- getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[] {messages.size(), e});
+ getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
}
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index 81268fe..3961f32 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -44,29 +44,28 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
-
@SupportsBatching
@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
@SeeAlso({GetSQS.class, DeleteSQS.class})
@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
-@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute",
- description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
- + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage=true)
+@DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute",
+ description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
+ + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage = true)
public class PutSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
- .name("Delay")
- .description("The amount of time to delay the message before it becomes available to consumers")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("0 secs")
- .build();
+ .name("Delay")
+ .description("The amount of time to delay the message before it becomes available to consumers")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("0 secs")
+ .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT) );
+ Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT));
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
-
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
@@ -75,70 +74,70 @@ public class PutSQS extends AbstractSQSProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .expressionLanguageSupported(true)
- .required(false)
- .dynamic(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
}
-
+
@OnScheduled
public void setup(final ProcessContext context) {
userDefinedProperties = new ArrayList<>();
- for ( final PropertyDescriptor descriptor : context.getProperties().keySet() ) {
- if ( descriptor.isDynamic() ) {
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (descriptor.isDynamic()) {
userDefinedProperties.add(descriptor);
}
}
}
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
- if ( flowFile == null ) {
+ if (flowFile == null) {
return;
}
-
+
final long startNanos = System.nanoTime();
final AmazonSQSClient client = getClient();
final SendMessageBatchRequest request = new SendMessageBatchRequest();
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
request.setQueueUrl(queueUrl);
-
+
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
-
+
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
entry.setId(flowFile.getAttribute("uuid"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString();
entry.setMessageBody(flowFileContent);
-
+
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
-
- for ( final PropertyDescriptor descriptor : userDefinedProperties ) {
+
+ for (final PropertyDescriptor descriptor : userDefinedProperties) {
final MessageAttributeValue mav = new MessageAttributeValue();
mav.setDataType("String");
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
messageAttributes.put(descriptor.getName(), mav);
}
-
+
entry.setMessageAttributes(messageAttributes);
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
entries.add(entry);
-
+
request.setEntries(entries);
-
+
try {
client.sendMessageBatch(request);
} catch (final Exception e) {
- getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[] {e});
+ getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
session.transfer(flowFile, REL_FAILURE);
return;
}
-
- getLogger().info("Successfully published message to Amazon SQS for {}", new Object[] {flowFile});
+
+ getLogger().info("Successfully published message to Amazon SQS for {}", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 40f9515..0321514 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
@@ -15,30 +31,31 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestFetchS3Object {
+
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
-
+
@Test
public void testGet() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.BUCKET, "anonymous-test-bucket-00000000");
runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.KEY, "folder/1.txt");
-
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("start", "0");
-
+
runner.enqueue(new byte[0], attrs);
runner.run(1);
-
+
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
-
+
final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
out.assertContentEquals(new String(expectedBytes));
- for ( final Map.Entry<String, String> entry : out.getAttributes().entrySet() ) {
+ for (final Map.Entry<String, String> entry : out.getAttributes().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 0a019f3..de7816d 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
@@ -17,55 +33,54 @@ import com.amazonaws.services.s3.model.StorageClass;
public class TestPutS3Object {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
-
+
@Test
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
- Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
-
- for (int i=0; i < 3; i++) {
+ Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
+ for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
}
runner.run(3);
-
+
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
}
-
+
@Test
public void testPutInFolder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
- Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
-
+ Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
-
+
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
-
@Test
public void testStorageClass() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
- Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
-
+ Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/2.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
-
+
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@@ -75,12 +90,12 @@ public class TestPutS3Object {
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
-
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/4.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
-
+
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
index b505622..1e914c7 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.aws.sns;
import static org.junit.Assert.assertTrue;
@@ -14,20 +30,21 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSNS {
+
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
-
+
@Test
public void testPublish() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
- assertTrue( runner.setProperty("DynamicProperty", "hello!").isValid() );
-
+ assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
-
+
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
index de4a5d9..0e70e7b 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.aws.sqs;
import java.util.List;
@@ -11,6 +27,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestGetSQS {
+
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@@ -19,11 +36,11 @@ public class TestGetSQS {
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
-
+
runner.run(1);
-
+
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
- for ( final MockFlowFile mff : flowFiles ) {
+ for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
index a90a4ce..1f9851a 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.aws.sqs;
import java.io.IOException;
@@ -14,6 +30,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSQS {
+
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@@ -22,13 +39,13 @@ public class TestPutSQS {
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
- Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
-
+ Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run(1);
-
+
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
index 117d7dd..4435327 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
@@ -30,14 +30,14 @@
<module>nifi-aws-nar</module>
</modules>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk</artifactId>
- <version>1.9.24</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.9.24</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..bde2a66
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,68 @@
+nifi-geo-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpClient
+ Copyright 1999-2014 The Apache Software Foundation
+
+ Apache HttpCore
+ Copyright 2005-2014 The Apache Software Foundation
+
+ This project contains annotations derived from JCIP-ANNOTATIONS
+ Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons Logging
+ The following NOTICE information applies:
+ Apache Commons Logging
+ Copyright 2003-2013 The Apache Software Foundation
+
+ (ASLv2) GeoIP2 Java API
+ The following NOTICE information applies:
+ GeoIP2 Java API
+ This software is Copyright (c) 2013 by MaxMind, Inc.
+
+************************
+Creative Commons Attribution-ShareAlike 3.0
+************************
+
+The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
+
+ (CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..30b099f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,29 @@
+nifi-hl7-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+*****************
+Mozilla Public License v1.1
+*****************
+
+The following binary components are provided under the Mozilla Public License v1.1. See project link for details.
+
+ (MPL 1.1) HAPI Base (ca.uhn.hapi:hapi-base:2.2 - http://hl7api.sourceforge.net/)
+ (MPL 1.1) HAPI Structures (ca.uhn.hapi:hapi-structures-v*:2.2 - http://hl7api.sourceforge.net/)
+
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..33bcc0d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,57 @@
+nifi-social-media-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Apache Commons Logging
+ The following NOTICE information applies:
+ Apache Commons Logging
+ Copyright 2003-2013 The Apache Software Foundation
+
+ (ASLv2) Twitter4J
+ The following NOTICE information applies:
+ Copyright 2007 Yusuke Yamamoto
+
+ Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html
+
+ (ASLv2) JOAuth
+ The following NOTICE information applies:
+ JOAuth
+ Copyright 2010-2013 Twitter, Inc
+
+ (ASLv2) Hosebird Client
+ The following NOTICE information applies:
+ Hosebird Client (hbc)
+ Copyright 2013 Twitter, Inc.
\ No newline at end of file