You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/12/02 04:51:25 UTC

[19/24] nifi git commit: NIFI-1054: Fixed DOS line endings in xml, java and js source files

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 3f88a74..6755c13 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -1,214 +1,214 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.s3;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.AccessControlList;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.StorageClass;
-
-@SupportsBatching
-@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
-@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
-    description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
-@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
-@WritesAttributes({
-    @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
-    @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
-    @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
-    @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
-    @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"),
-    @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
-    @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set")
-})
-public class PutS3Object extends AbstractS3Processor {
-
-    public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
-        .name("Expiration Time Rule")
-        .required(false)
-        .expressionLanguageSupported(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
-
-    public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
-        .name("Storage Class")
-        .required(true)
-        .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
-        .defaultValue(StorageClass.Standard.name())
-        .build();
-
-    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
-            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
-
-    final static String S3_BUCKET_KEY = "s3.bucket";
-    final static String S3_OBJECT_KEY = "s3.key";
-    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
-    final static String S3_VERSION_ATTR_KEY = "s3.version";
-    final static String S3_ETAG_ATTR_KEY = "s3.etag";
-    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
-    final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
-    final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
-    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-            .name(propertyDescriptorName)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .dynamic(true)
-            .build();
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-
-        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-
-        final AmazonS3Client s3 = getClient();
-        final FlowFile ff = flowFile;
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(S3_BUCKET_KEY, bucket);
-        attributes.put(S3_OBJECT_KEY, key);
-
-        try {
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream rawIn) throws IOException {
-                    try (final InputStream in = new BufferedInputStream(rawIn)) {
-                        final ObjectMetadata objectMetadata = new ObjectMetadata();
-                        objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
-                        objectMetadata.setContentLength(ff.getSize());
-
-                        final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
-                        if (expirationRule != null) {
-                            objectMetadata.setExpirationTimeRuleId(expirationRule);
-                        }
-
-                        final Map<String, String> userMetadata = new HashMap<>();
-                        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
-                            if (entry.getKey().isDynamic()) {
-                                final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
-                                userMetadata.put(entry.getKey().getName(), value);
-                            }
-                        }
-
-                        if (!userMetadata.isEmpty()) {
-                            objectMetadata.setUserMetadata(userMetadata);
-                        }
-
-                        final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
-                        request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
-                        final AccessControlList acl = createACL(context, ff);
-                        if (acl != null) {
-                            request.setAccessControlList(acl);
-                        }
-
-                        final PutObjectResult result = s3.putObject(request);
-                        if (result.getVersionId() != null) {
-                            attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
-                        }
-
-                        attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
-
-                        final Date expiration = result.getExpirationTime();
-                        if (expiration != null) {
-                            attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString());
-                        }
-                        if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
-                            attributes.put(S3_STORAGECLASS_ATTR_KEY,
-                                    result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
-                        }
-                        if (userMetadata.size() > 0) {
-                            List<String> pairs = new ArrayList<String>();
-                            for (String userKey : userMetadata.keySet()) {
-                                pairs.add(userKey + "=" + userMetadata.get(userKey));
-                            }
-                            attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", "));
-                        }
-                    }
-                }
-            });
-
-            if (!attributes.isEmpty()) {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-            }
-            session.transfer(flowFile, REL_SUCCESS);
-
-            final String url = s3.getResourceUrl(bucket, key);
-            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            session.getProvenanceReporter().send(flowFile, url, millis);
-
-            getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
-        } catch (final ProcessException | AmazonClientException pe) {
-            getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.StorageClass;
+
+@SupportsBatching
+@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
+@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
+    description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
+@WritesAttributes({
+    @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
+    @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
+    @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
+    @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
+    @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"),
+    @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
+    @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set")
+})
+public class PutS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
+        .name("Expiration Time Rule")
+        .required(false)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
+        .name("Storage Class")
+        .required(true)
+        .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
+        .defaultValue(StorageClass.Standard.name())
+        .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
+
+    final static String S3_BUCKET_KEY = "s3.bucket";
+    final static String S3_OBJECT_KEY = "s3.key";
+    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
+    final static String S3_VERSION_ATTR_KEY = "s3.version";
+    final static String S3_ETAG_ATTR_KEY = "s3.etag";
+    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
+    final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
+    final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
+    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .dynamic(true)
+            .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3Client s3 = getClient();
+        final FlowFile ff = flowFile;
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(S3_BUCKET_KEY, bucket);
+        attributes.put(S3_OBJECT_KEY, key);
+
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream rawIn) throws IOException {
+                    try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        final ObjectMetadata objectMetadata = new ObjectMetadata();
+                        objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
+                        objectMetadata.setContentLength(ff.getSize());
+
+                        final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
+                        if (expirationRule != null) {
+                            objectMetadata.setExpirationTimeRuleId(expirationRule);
+                        }
+
+                        final Map<String, String> userMetadata = new HashMap<>();
+                        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+                            if (entry.getKey().isDynamic()) {
+                                final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+                                userMetadata.put(entry.getKey().getName(), value);
+                            }
+                        }
+
+                        if (!userMetadata.isEmpty()) {
+                            objectMetadata.setUserMetadata(userMetadata);
+                        }
+
+                        final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+                        request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                        final AccessControlList acl = createACL(context, ff);
+                        if (acl != null) {
+                            request.setAccessControlList(acl);
+                        }
+
+                        final PutObjectResult result = s3.putObject(request);
+                        if (result.getVersionId() != null) {
+                            attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
+                        }
+
+                        attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+
+                        final Date expiration = result.getExpirationTime();
+                        if (expiration != null) {
+                            attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString());
+                        }
+                        if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
+                            attributes.put(S3_STORAGECLASS_ATTR_KEY,
+                                    result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
+                        }
+                        if (userMetadata.size() > 0) {
+                            List<String> pairs = new ArrayList<String>();
+                            for (String userKey : userMetadata.keySet()) {
+                                pairs.add(userKey + "=" + userMetadata.get(userKey));
+                            }
+                            attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", "));
+                        }
+                    }
+                }
+            });
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+            final String url = s3.getResourceUrl(bucket, key);
+            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, url, millis);
+
+            getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
+        } catch (final ProcessException | AmazonClientException pe) {
+            getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
index 5b57647..109c92f 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -1,58 +1,58 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sns;
-
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.sns.AmazonSNSClient;
-
-public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
-
-    protected static final AllowableValue ARN_TYPE_TOPIC
-            = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
-    protected static final AllowableValue ARN_TYPE_TARGET
-            = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
-
-    public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
-            .name("Amazon Resource Name (ARN)")
-            .description("The name of the resource to which notifications should be published")
-            .expressionLanguageSupported(true)
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
-            .name("ARN Type")
-            .description("The type of Amazon Resource Name that is being used.")
-            .expressionLanguageSupported(false)
-            .required(true)
-            .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
-            .defaultValue(ARN_TYPE_TOPIC.getValue())
-            .build();
-
-    @Override
-    protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
-        return new AmazonSNSClient(credentials, config);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sns.AmazonSNSClient;
+
+public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
+
+    protected static final AllowableValue ARN_TYPE_TOPIC
+            = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
+    protected static final AllowableValue ARN_TYPE_TARGET
+            = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
+
+    public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
+            .name("Amazon Resource Name (ARN)")
+            .description("The name of the resource to which notifications should be published")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
+            .name("ARN Type")
+            .description("The type of Amazon Resource Name that is being used.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
+            .defaultValue(ARN_TYPE_TOPIC.getValue())
+            .build();
+
+    @Override
+    protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        return new AmazonSNSClient(credentials, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index b8c5830..f58babc 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -1,159 +1,159 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sns;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.sqs.GetSQS;
-import org.apache.nifi.processors.aws.sqs.PutSQS;
-
-import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.MessageAttributeValue;
-import com.amazonaws.services.sns.model.PublishRequest;
-
-@SupportsBatching
-@SeeAlso({GetSQS.class, PutSQS.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
-@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
-public class PutSNS extends AbstractSNSProcessor {
-
-    public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("The character set in which the FlowFile's content is encoded")
-            .defaultValue("UTF-8")
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .required(true)
-            .build();
-    public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
-            .name("Use JSON Structure")
-            .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'."
-                    + " Additional elements can be used to send different messages to different protocols. See the Amazon"
-                    + " SNS Documentation for more information.")
-            .defaultValue("false")
-            .allowableValues("true", "false")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
-            .name("E-mail Subject")
-            .description("The optional subject to use for any subscribers that are subscribed via E-mail")
-            .expressionLanguageSupported(true)
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
-                    USE_JSON_STRUCTURE, CHARACTER_ENCODING));
-
-    public static final int MAX_SIZE = 256 * 1024;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .required(false)
-                .dynamic(true)
-                .build();
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        if (flowFile.getSize() > MAX_SIZE) {
-            getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        session.exportTo(flowFile, baos);
-        final String message = new String(baos.toByteArray(), charset);
-
-        final AmazonSNSClient client = getClient();
-        final PublishRequest request = new PublishRequest();
-        request.setMessage(message);
-
-        if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
-            request.setMessageStructure("json");
-        }
-
-        final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
-        final String arnType = context.getProperty(ARN_TYPE).getValue();
-        if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
-            request.setTopicArn(arn);
-        } else {
-            request.setTargetArn(arn);
-        }
-
-        final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
-        if (subject != null) {
-            request.setSubject(subject);
-        }
-
-        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
-            if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
-                final MessageAttributeValue value = new MessageAttributeValue();
-                value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
-                value.setDataType("String");
-                request.addMessageAttributesEntry(entry.getKey().getName(), value);
-            }
-        }
-
-        try {
-            client.publish(request);
-            session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().send(flowFile, arn);
-            getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
-        } catch (final Exception e) {
-            getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.sqs.GetSQS;
+import org.apache.nifi.processors.aws.sqs.PutSQS;
+
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.MessageAttributeValue;
+import com.amazonaws.services.sns.model.PublishRequest;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
+@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
+public class PutSNS extends AbstractSNSProcessor {
+
+    public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("The character set in which the FlowFile's content is encoded")
+            .defaultValue("UTF-8")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
+            .name("Use JSON Structure")
+            .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'."
+                    + " Additional elements can be used to send different messages to different protocols. See the Amazon"
+                    + " SNS Documentation for more information.")
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
+            .name("E-mail Subject")
+            .description("The optional subject to use for any subscribers that are subscribed via E-mail")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
+                    USE_JSON_STRUCTURE, CHARACTER_ENCODING));
+
+    public static final int MAX_SIZE = 256 * 1024;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .required(false)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (flowFile.getSize() > MAX_SIZE) {
+            getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String message = new String(baos.toByteArray(), charset);
+
+        final AmazonSNSClient client = getClient();
+        final PublishRequest request = new PublishRequest();
+        request.setMessage(message);
+
+        if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
+            request.setMessageStructure("json");
+        }
+
+        final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
+        final String arnType = context.getProperty(ARN_TYPE).getValue();
+        if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
+            request.setTopicArn(arn);
+        } else {
+            request.setTargetArn(arn);
+        }
+
+        final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
+        if (subject != null) {
+            request.setSubject(subject);
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
+                final MessageAttributeValue value = new MessageAttributeValue();
+                value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
+                value.setDataType("String");
+                request.addMessageAttributesEntry(entry.getKey().getName(), value);
+            }
+        }
+
+        try {
+            client.publish(request);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, arn);
+            getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
+        } catch (final Exception e) {
+            getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index 3cee02d..bf8058f 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -1,51 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-
-public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
-
-    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of messages to send in a single network request")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("25")
-            .build();
-
-    public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
-            .name("Queue URL")
-            .description("The URL of the queue to act upon")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(true)
-            .build();
-
-    @Override
-    protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
-        return new AmazonSQSClient(credentials, config);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+
+public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of messages to send in a single network request")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("25")
+            .build();
+
+    public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
+            .name("Queue URL")
+            .description("The URL of the queue to act upon")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+
+    @Override
+    protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        return new AmazonSQSClient(credentials, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index bd0dfa5..73e3715 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -1,102 +1,102 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-
-@SupportsBatching
-@SeeAlso({GetSQS.class, PutSQS.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
-@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
-public class DeleteSQS extends AbstractSQSProcessor {
-
-    public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
-            .name("Receipt Handle")
-            .description("The identifier that specifies the receipt of the message")
-            .expressionLanguageSupported(true)
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("${sqs.receipt.handle}")
-            .build();
-
-    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        List<FlowFile> flowFiles = session.get(1);
-        if (flowFiles.isEmpty()) {
-            return;
-        }
-
-        final FlowFile firstFlowFile = flowFiles.get(0);
-        final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
-
-        final AmazonSQSClient client = getClient();
-        final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
-        request.setQueueUrl(queueUrl);
-
-        final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
-
-        for (final FlowFile flowFile : flowFiles) {
-            final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
-            entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
-            entries.add(entry);
-        }
-
-        request.setEntries(entries);
-
-        try {
-            client.deleteMessageBatch(request);
-            getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
-            session.transfer(flowFiles, REL_SUCCESS);
-        } catch (final Exception e) {
-            getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
-            final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
-            for (final FlowFile flowFile : flowFiles) {
-                penalizedFlowFiles.add(session.penalize(flowFile));
-            }
-            session.transfer(penalizedFlowFiles, REL_FAILURE);
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
+@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
+public class DeleteSQS extends AbstractSQSProcessor {
+
+    public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
+            .name("Receipt Handle")
+            .description("The identifier that specifies the receipt of the message")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("${sqs.receipt.handle}")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        List<FlowFile> flowFiles = session.get(1);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final FlowFile firstFlowFile = flowFiles.get(0);
+        final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
+
+        final AmazonSQSClient client = getClient();
+        final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
+        request.setQueueUrl(queueUrl);
+
+        final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
+
+        for (final FlowFile flowFile : flowFiles) {
+            final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
+            entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
+            entries.add(entry);
+        }
+
+        request.setEntries(entries);
+
+        try {
+            client.deleteMessageBatch(request);
+            getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
+            session.transfer(flowFiles, REL_SUCCESS);
+        } catch (final Exception e) {
+            getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
+            final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
+            for (final FlowFile flowFile : flowFiles) {
+                penalizedFlowFiles.add(session.penalize(flowFile));
+            }
+            session.transfer(penalizedFlowFiles, REL_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index f395e67..91166a2 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -1,219 +1,219 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-
-@SupportsBatching
-@SeeAlso({ PutSQS.class, DeleteSQS.class })
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
-@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
-@WritesAttributes({
-    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
-    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
-    @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
-    @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
-})
-public class GetSQS extends AbstractSQSProcessor {
-
-    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("The Character Set that should be used to encode the textual content of the SQS message")
-            .required(true)
-            .defaultValue("UTF-8")
-            .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
-            .build();
-
-    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
-            .name("Auto Delete Messages")
-            .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
-
-    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Visibility Timeout")
-            .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
-            .expressionLanguageSupported(false)
-            .required(true)
-            .defaultValue("15 mins")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of messages to send in a single network request")
-            .required(true)
-            .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
-            .defaultValue("10")
-            .build();
-
-    public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(QUEUE_URL)
-            .expressionLanguageSupported(false)
-            .build();
-
-    public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder()
-            .name("Receive Message Wait Time")
-            .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will "
-                + "reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.")
-            .expressionLanguageSupported(false)
-            .required(true)
-            .defaultValue("0 sec")
-            .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS))  // 20 seconds is the maximum allowed by SQS
-            .build();
-
-    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return Collections.singleton(REL_SUCCESS);
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
-
-        final AmazonSQSClient client = getClient();
-
-        final ReceiveMessageRequest request = new ReceiveMessageRequest();
-        request.setAttributeNames(Collections.singleton("All"));
-        request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
-        request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
-        request.setQueueUrl(queueUrl);
-        request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
-
-        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
-
-        final ReceiveMessageResult result;
-        try {
-            result = client.receiveMessage(request);
-        } catch (final Exception e) {
-            getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
-            context.yield();
-            return;
-        }
-
-        final List<Message> messages = result.getMessages();
-        if (messages.isEmpty()) {
-            context.yield();
-            return;
-        }
-
-        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
-
-        for (final Message message : messages) {
-            FlowFile flowFile = session.create();
-
-            final Map<String, String> attributes = new HashMap<>();
-            for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
-                attributes.put("sqs." + entry.getKey(), entry.getValue());
-            }
-
-            for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
-                attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
-            }
-
-            attributes.put("hash.value", message.getMD5OfBody());
-            attributes.put("hash.algorithm", "md5");
-            attributes.put("sqs.message.id", message.getMessageId());
-            attributes.put("sqs.receipt.handle", message.getReceiptHandle());
-
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    out.write(message.getBody().getBytes(charset));
-                }
-            });
-
-            session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().receive(flowFile, queueUrl);
-
-            getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
-        }
-
-        if (autoDelete) {
-            // If we want to auto-delete messages, we must fist commit the session to ensure that the data
-            // is persisted in NiFi's repositories.
-            session.commit();
-
-            final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
-            deleteRequest.setQueueUrl(queueUrl);
-            final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
-            for (final Message message : messages) {
-                final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
-                entry.setId(message.getMessageId());
-                entry.setReceiptHandle(message.getReceiptHandle());
-                deleteRequestEntries.add(entry);
-            }
-
-            deleteRequest.setEntries(deleteRequestEntries);
-
-            try {
-                client.deleteMessageBatch(deleteRequest);
-            } catch (final Exception e) {
-                getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages"
-                        + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
-            }
-        }
-
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
+@SupportsBatching
+@SeeAlso({ PutSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
+@WritesAttributes({
+    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
+    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+    @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
+    @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
+})
+public class GetSQS extends AbstractSQSProcessor {
+
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("The Character Set that should be used to encode the textual content of the SQS message")
+            .required(true)
+            .defaultValue("UTF-8")
+            .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
+            .build();
+
+    public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+            .name("Auto Delete Messages")
+            .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Visibility Timeout")
+            .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .defaultValue("15 mins")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of messages to send in a single network request")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
+            .defaultValue("10")
+            .build();
+
+    public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(QUEUE_URL)
+            .expressionLanguageSupported(false)
+            .build();
+
+    public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder()
+            .name("Receive Message Wait Time")
+            .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will "
+                + "reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .defaultValue("0 sec")
+            .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS))  // 20 seconds is the maximum allowed by SQS
+            .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
+
+        final AmazonSQSClient client = getClient();
+
+        final ReceiveMessageRequest request = new ReceiveMessageRequest();
+        request.setAttributeNames(Collections.singleton("All"));
+        request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
+        request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
+        request.setQueueUrl(queueUrl);
+        request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
+
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+
+        final ReceiveMessageResult result;
+        try {
+            result = client.receiveMessage(request);
+        } catch (final Exception e) {
+            getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
+            context.yield();
+            return;
+        }
+
+        final List<Message> messages = result.getMessages();
+        if (messages.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+        for (final Message message : messages) {
+            FlowFile flowFile = session.create();
+
+            final Map<String, String> attributes = new HashMap<>();
+            for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
+                attributes.put("sqs." + entry.getKey(), entry.getValue());
+            }
+
+            for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
+                attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
+            }
+
+            attributes.put("hash.value", message.getMD5OfBody());
+            attributes.put("hash.algorithm", "md5");
+            attributes.put("sqs.message.id", message.getMessageId());
+            attributes.put("sqs.receipt.handle", message.getReceiptHandle());
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException {
+                    out.write(message.getBody().getBytes(charset));
+                }
+            });
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, queueUrl);
+
+            getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
+        }
+
+        if (autoDelete) {
+            // If we want to auto-delete messages, we must fist commit the session to ensure that the data
+            // is persisted in NiFi's repositories.
+            session.commit();
+
+            final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
+            deleteRequest.setQueueUrl(queueUrl);
+            final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
+            for (final Message message : messages) {
+                final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
+                entry.setId(message.getMessageId());
+                entry.setReceiptHandle(message.getReceiptHandle());
+                deleteRequestEntries.add(entry);
+            }
+
+            deleteRequest.setEntries(deleteRequestEntries);
+
+            try {
+                client.deleteMessageBatch(deleteRequest);
+            } catch (final Exception e) {
+                getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages"
+                        + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
+            }
+        }
+
+    }
+
+}