You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/19 07:21:02 UTC
[19/24] nifi git commit: NIFI-1054: Fixing Line endings of source code
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index b03d85d..744bc31 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -1,184 +1,184 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.s3;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.AccessControlList;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.StorageClass;
-
-@SupportsBatching
-@SeeAlso({FetchS3Object.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
-@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
- description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
-@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
-@WritesAttributes({
- @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
- @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
- @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
-})
-public class PutS3Object extends AbstractS3Processor {
-
- public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
- .name("Expiration Time Rule")
- .required(false)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
- .name("Storage Class")
- .required(true)
- .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
- .defaultValue(StorageClass.Standard.name())
- .build();
-
- public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
- FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .dynamic(true)
- .build();
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final long startNanos = System.nanoTime();
-
- final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
- final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-
- final AmazonS3 s3 = getClient();
- final FlowFile ff = flowFile;
- final Map<String, String> attributes = new HashMap<>();
- try {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- try (final InputStream in = new BufferedInputStream(rawIn)) {
- final ObjectMetadata objectMetadata = new ObjectMetadata();
- objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
- objectMetadata.setContentLength(ff.getSize());
-
- final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
- if (expirationRule != null) {
- objectMetadata.setExpirationTimeRuleId(expirationRule);
- }
-
- final Map<String, String> userMetadata = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
- userMetadata.put(entry.getKey().getName(), value);
- }
- }
-
- if (!userMetadata.isEmpty()) {
- objectMetadata.setUserMetadata(userMetadata);
- }
-
- final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
- request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- final AccessControlList acl = createACL(context, ff);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
- final PutObjectResult result = s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put("s3.version", result.getVersionId());
- }
-
- attributes.put("s3.etag", result.getETag());
-
- final Date expiration = result.getExpirationTime();
- if (expiration != null) {
- attributes.put("s3.expiration", expiration.toString());
- }
- }
- }
- });
-
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
- session.transfer(flowFile, REL_SUCCESS);
-
- final String url = getUrlForObject(bucket, key);
- final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile, url, millis);
-
- getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
- } catch (final ProcessException | AmazonClientException pe) {
- getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.StorageClass;
+
+@SupportsBatching
+@SeeAlso({FetchS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
+@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
+ description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
+@WritesAttributes({
+ @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
+ @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
+ @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
+})
+public class PutS3Object extends AbstractS3Processor {
+
+ public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
+ .name("Expiration Time Rule")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
+ .name("Storage Class")
+ .required(true)
+ .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
+ .defaultValue(StorageClass.Standard.name())
+ .build();
+
+ public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+ Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+ FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final AmazonS3 s3 = getClient();
+ final FlowFile ff = flowFile;
+ final Map<String, String> attributes = new HashMap<>();
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ final ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
+ objectMetadata.setContentLength(ff.getSize());
+
+ final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
+ if (expirationRule != null) {
+ objectMetadata.setExpirationTimeRuleId(expirationRule);
+ }
+
+ final Map<String, String> userMetadata = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ userMetadata.put(entry.getKey().getName(), value);
+ }
+ }
+
+ if (!userMetadata.isEmpty()) {
+ objectMetadata.setUserMetadata(userMetadata);
+ }
+
+ final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+ request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl = createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
+
+ final PutObjectResult result = s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put("s3.version", result.getVersionId());
+ }
+
+ attributes.put("s3.etag", result.getETag());
+
+ final Date expiration = result.getExpirationTime();
+ if (expiration != null) {
+ attributes.put("s3.expiration", expiration.toString());
+ }
+ }
+ }
+ });
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String url = getUrlForObject(bucket, key);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url, millis);
+
+ getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
+ } catch (final ProcessException | AmazonClientException pe) {
+ getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
index 5b57647..109c92f 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -1,58 +1,58 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sns;
-
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.sns.AmazonSNSClient;
-
-public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
-
- protected static final AllowableValue ARN_TYPE_TOPIC
- = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
- protected static final AllowableValue ARN_TYPE_TARGET
- = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
-
- public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
- .name("Amazon Resource Name (ARN)")
- .description("The name of the resource to which notifications should be published")
- .expressionLanguageSupported(true)
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
- .name("ARN Type")
- .description("The type of Amazon Resource Name that is being used.")
- .expressionLanguageSupported(false)
- .required(true)
- .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
- .defaultValue(ARN_TYPE_TOPIC.getValue())
- .build();
-
- @Override
- protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
- return new AmazonSNSClient(credentials, config);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sns.AmazonSNSClient;
+
+public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
+
+ protected static final AllowableValue ARN_TYPE_TOPIC
+ = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
+ protected static final AllowableValue ARN_TYPE_TARGET
+ = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
+
+ public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
+ .name("Amazon Resource Name (ARN)")
+ .description("The name of the resource to which notifications should be published")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
+ .name("ARN Type")
+ .description("The type of Amazon Resource Name that is being used.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
+ .defaultValue(ARN_TYPE_TOPIC.getValue())
+ .build();
+
+ @Override
+ protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+ return new AmazonSNSClient(credentials, config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index b8c5830..f58babc 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -1,159 +1,159 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sns;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.sqs.GetSQS;
-import org.apache.nifi.processors.aws.sqs.PutSQS;
-
-import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.MessageAttributeValue;
-import com.amazonaws.services.sns.model.PublishRequest;
-
-@SupportsBatching
-@SeeAlso({GetSQS.class, PutSQS.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
-@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
-public class PutSNS extends AbstractSNSProcessor {
-
- public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
- .name("Character Set")
- .description("The character set in which the FlowFile's content is encoded")
- .defaultValue("UTF-8")
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .required(true)
- .build();
- public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
- .name("Use JSON Structure")
- .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'."
- + " Additional elements can be used to send different messages to different protocols. See the Amazon"
- + " SNS Documentation for more information.")
- .defaultValue("false")
- .allowableValues("true", "false")
- .required(true)
- .build();
- public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
- .name("E-mail Subject")
- .description("The optional subject to use for any subscribers that are subscribed via E-mail")
- .expressionLanguageSupported(true)
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
- USE_JSON_STRUCTURE, CHARACTER_ENCODING));
-
- public static final int MAX_SIZE = 256 * 1024;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .required(false)
- .dynamic(true)
- .build();
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- if (flowFile.getSize() > MAX_SIZE) {
- getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- session.exportTo(flowFile, baos);
- final String message = new String(baos.toByteArray(), charset);
-
- final AmazonSNSClient client = getClient();
- final PublishRequest request = new PublishRequest();
- request.setMessage(message);
-
- if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
- request.setMessageStructure("json");
- }
-
- final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
- final String arnType = context.getProperty(ARN_TYPE).getValue();
- if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
- request.setTopicArn(arn);
- } else {
- request.setTargetArn(arn);
- }
-
- final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
- if (subject != null) {
- request.setSubject(subject);
- }
-
- for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
- final MessageAttributeValue value = new MessageAttributeValue();
- value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
- value.setDataType("String");
- request.addMessageAttributesEntry(entry.getKey().getName(), value);
- }
- }
-
- try {
- client.publish(request);
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, arn);
- getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
- } catch (final Exception e) {
- getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.sqs.GetSQS;
+import org.apache.nifi.processors.aws.sqs.PutSQS;
+
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.MessageAttributeValue;
+import com.amazonaws.services.sns.model.PublishRequest;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
+@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
+public class PutSNS extends AbstractSNSProcessor {
+
+ public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("The character set in which the FlowFile's content is encoded")
+ .defaultValue("UTF-8")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
+ .name("Use JSON Structure")
+ .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'."
+ + " Additional elements can be used to send different messages to different protocols. See the Amazon"
+ + " SNS Documentation for more information.")
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
+ .name("E-mail Subject")
+ .description("The optional subject to use for any subscribers that are subscribed via E-mail")
+ .expressionLanguageSupported(true)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+ Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
+ USE_JSON_STRUCTURE, CHARACTER_ENCODING));
+
+ public static final int MAX_SIZE = 256 * 1024;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ if (flowFile.getSize() > MAX_SIZE) {
+ getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ session.exportTo(flowFile, baos);
+ final String message = new String(baos.toByteArray(), charset);
+
+ final AmazonSNSClient client = getClient();
+ final PublishRequest request = new PublishRequest();
+ request.setMessage(message);
+
+ if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
+ request.setMessageStructure("json");
+ }
+
+ final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
+ final String arnType = context.getProperty(ARN_TYPE).getValue();
+ if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
+ request.setTopicArn(arn);
+ } else {
+ request.setTargetArn(arn);
+ }
+
+ final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
+ if (subject != null) {
+ request.setSubject(subject);
+ }
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
+ final MessageAttributeValue value = new MessageAttributeValue();
+ value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
+ value.setDataType("String");
+ request.addMessageAttributesEntry(entry.getKey().getName(), value);
+ }
+ }
+
+ try {
+ client.publish(request);
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, arn);
+ getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
+ } catch (final Exception e) {
+ getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index 3cee02d..bf8058f 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -1,51 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-
-public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
-
- public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of messages to send in a single network request")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("25")
- .build();
-
- public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
- .name("Queue URL")
- .description("The URL of the queue to act upon")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .required(true)
- .build();
-
- @Override
- protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
- return new AmazonSQSClient(credentials, config);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+
+public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of messages to send in a single network request")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("25")
+ .build();
+
+ public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
+ .name("Queue URL")
+ .description("The URL of the queue to act upon")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ @Override
+ protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+ return new AmazonSQSClient(credentials, config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index bd0dfa5..73e3715 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -1,102 +1,102 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-
-@SupportsBatching
-@SeeAlso({GetSQS.class, PutSQS.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
-@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
-public class DeleteSQS extends AbstractSQSProcessor {
-
- public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
- .name("Receipt Handle")
- .description("The identifier that specifies the receipt of the message")
- .expressionLanguageSupported(true)
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue("${sqs.receipt.handle}")
- .build();
-
- public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- List<FlowFile> flowFiles = session.get(1);
- if (flowFiles.isEmpty()) {
- return;
- }
-
- final FlowFile firstFlowFile = flowFiles.get(0);
- final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
-
- final AmazonSQSClient client = getClient();
- final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
- request.setQueueUrl(queueUrl);
-
- final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
-
- for (final FlowFile flowFile : flowFiles) {
- final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
- entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
- entries.add(entry);
- }
-
- request.setEntries(entries);
-
- try {
- client.deleteMessageBatch(request);
- getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
- session.transfer(flowFiles, REL_SUCCESS);
- } catch (final Exception e) {
- getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
- final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
- for (final FlowFile flowFile : flowFiles) {
- penalizedFlowFiles.add(session.penalize(flowFile));
- }
- session.transfer(penalizedFlowFiles, REL_FAILURE);
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
+@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
+public class DeleteSQS extends AbstractSQSProcessor {
+
+ public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
+ .name("Receipt Handle")
+ .description("The identifier that specifies the receipt of the message")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("${sqs.receipt.handle}")
+ .build();
+
+ public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+ Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ List<FlowFile> flowFiles = session.get(1);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final FlowFile firstFlowFile = flowFiles.get(0);
+ final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
+
+ final AmazonSQSClient client = getClient();
+ final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
+ request.setQueueUrl(queueUrl);
+
+ final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
+
+ for (final FlowFile flowFile : flowFiles) {
+ final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
+ entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
+ entries.add(entry);
+ }
+
+ request.setEntries(entries);
+
+ try {
+ client.deleteMessageBatch(request);
+ getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
+ session.transfer(flowFiles, REL_SUCCESS);
+ } catch (final Exception e) {
+ getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
+ final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
+ for (final FlowFile flowFile : flowFiles) {
+ penalizedFlowFiles.add(session.penalize(flowFile));
+ }
+ session.transfer(penalizedFlowFiles, REL_FAILURE);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 10b17e9..cf73ace 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -1,208 +1,208 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.sqs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-
-@SupportsBatching
-@SeeAlso({ PutSQS.class, DeleteSQS.class })
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
-@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
-@WritesAttributes({
- @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
- @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
- @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
- @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
-})
-public class GetSQS extends AbstractSQSProcessor {
-
- public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("Character Set")
- .description("The Character Set that should be used to encode the textual content of the SQS message")
- .required(true)
- .defaultValue("UTF-8")
- .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
- .build();
-
- public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
- .name("Auto Delete Messages")
- .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
-
- public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Visibility Timeout")
- .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
- .expressionLanguageSupported(false)
- .required(true)
- .defaultValue("15 mins")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of messages to send in a single network request")
- .required(true)
- .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
- .defaultValue("10")
- .build();
-
- public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(QUEUE_URL)
- .expressionLanguageSupported(false)
- .build();
-
- public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return Collections.singleton(REL_SUCCESS);
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
-
- final AmazonSQSClient client = getClient();
-
- final ReceiveMessageRequest request = new ReceiveMessageRequest();
- request.setAttributeNames(Collections.singleton("All"));
- request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
- request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
- request.setQueueUrl(queueUrl);
-
- final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
-
- final ReceiveMessageResult result;
- try {
- result = client.receiveMessage(request);
- } catch (final Exception e) {
- getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
- context.yield();
- return;
- }
-
- final List<Message> messages = result.getMessages();
- if (messages.isEmpty()) {
- context.yield();
- return;
- }
-
- final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
-
- for (final Message message : messages) {
- FlowFile flowFile = session.create();
-
- final Map<String, String> attributes = new HashMap<>();
- for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
- attributes.put("sqs." + entry.getKey(), entry.getValue());
- }
-
- for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
- attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
- }
-
- attributes.put("hash.value", message.getMD5OfBody());
- attributes.put("hash.algorithm", "md5");
- attributes.put("sqs.message.id", message.getMessageId());
- attributes.put("sqs.receipt.handle", message.getReceiptHandle());
-
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(message.getBody().getBytes(charset));
- }
- });
-
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile, queueUrl);
-
- getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
- }
-
- if (autoDelete) {
- // If we want to auto-delete messages, we must fist commit the session to ensure that the data
- // is persisted in NiFi's repositories.
- session.commit();
-
- final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
- deleteRequest.setQueueUrl(queueUrl);
- final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
- for (final Message message : messages) {
- final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
- entry.setId(message.getMessageId());
- entry.setReceiptHandle(message.getReceiptHandle());
- deleteRequestEntries.add(entry);
- }
-
- deleteRequest.setEntries(deleteRequestEntries);
-
- try {
- client.deleteMessageBatch(deleteRequest);
- } catch (final Exception e) {
- getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages"
- + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
- }
- }
-
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
+@SupportsBatching
+@SeeAlso({ PutSQS.class, DeleteSQS.class })
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
+ @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+ @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
+ @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
+})
+public class GetSQS extends AbstractSQSProcessor {
+
+ public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("The Character Set that should be used to encode the textual content of the SQS message")
+ .required(true)
+ .defaultValue("UTF-8")
+ .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
+ .build();
+
+ public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
+ .name("Auto Delete Messages")
+ .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Visibility Timeout")
+ .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .defaultValue("15 mins")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of messages to send in a single network request")
+ .required(true)
+ .addValidator(StandardValidators.createLongValidator(1L, 10L, true))
+ .defaultValue("10")
+ .build();
+
+ public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(QUEUE_URL)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+ Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
+
+ final AmazonSQSClient client = getClient();
+
+ final ReceiveMessageRequest request = new ReceiveMessageRequest();
+ request.setAttributeNames(Collections.singleton("All"));
+ request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
+ request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
+ request.setQueueUrl(queueUrl);
+
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+
+ final ReceiveMessageResult result;
+ try {
+ result = client.receiveMessage(request);
+ } catch (final Exception e) {
+ getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
+ context.yield();
+ return;
+ }
+
+ final List<Message> messages = result.getMessages();
+ if (messages.isEmpty()) {
+ context.yield();
+ return;
+ }
+
+ final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
+
+ for (final Message message : messages) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+ for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
+ attributes.put("sqs." + entry.getKey(), entry.getValue());
+ }
+
+ for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
+ attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
+ }
+
+ attributes.put("hash.value", message.getMD5OfBody());
+ attributes.put("hash.algorithm", "md5");
+ attributes.put("sqs.message.id", message.getMessageId());
+ attributes.put("sqs.receipt.handle", message.getReceiptHandle());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(message.getBody().getBytes(charset));
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, queueUrl);
+
+ getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
+ }
+
+ if (autoDelete) {
+ // If we want to auto-delete messages, we must fist commit the session to ensure that the data
+ // is persisted in NiFi's repositories.
+ session.commit();
+
+ final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
+ deleteRequest.setQueueUrl(queueUrl);
+ final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
+ for (final Message message : messages) {
+ final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
+ entry.setId(message.getMessageId());
+ entry.setReceiptHandle(message.getReceiptHandle());
+ deleteRequestEntries.add(entry);
+ }
+
+ deleteRequest.setEntries(deleteRequestEntries);
+
+ try {
+ client.deleteMessageBatch(deleteRequest);
+ } catch (final Exception e) {
+ getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages"
+ + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
+ }
+ }
+
+ }
+
+}