You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2019/11/22 14:16:45 UTC

[nifi] branch master updated: NIFI-6724 - Check for SQS API call result in case of failures

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6507b78  NIFI-6724 - Check for SQS API call result in case of failures
6507b78 is described below

commit 6507b789483f9de9faf8dabd8d668abab0b0db78
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Nov 20 18:07:06 2019 +0100

    NIFI-6724 - Check for SQS API call result in case of failures
    
    This closes #3897.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../apache/nifi/processors/aws/sqs/DeleteSQS.java  | 49 +++++++++++-----------
 .../org/apache/nifi/processors/aws/sqs/PutSQS.java |  9 +++-
 .../nifi/processors/aws/sqs/TestDeleteSQS.java     |  8 +++-
 nifi-nar-bundles/nifi-aws-bundle/pom.xml           |  2 +-
 4 files changed, 40 insertions(+), 28 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 71e4e02..a838fc0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -33,11 +33,13 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
 import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
 
 @SupportsBatching
 @SeeAlso({GetSQS.class, PutSQS.class})
@@ -66,42 +68,41 @@ public class DeleteSQS extends AbstractSQSProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        List<FlowFile> flowFiles = session.get(1);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
 
-        final FlowFile firstFlowFile = flowFiles.get(0);
-        final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
+        final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
 
         final AmazonSQSClient client = getClient();
         final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
         request.setQueueUrl(queueUrl);
 
-        final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
-
-        for (final FlowFile flowFile : flowFiles) {
-            final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
-            String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
-            entry.setReceiptHandle(receiptHandle);
-            String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
-            entry.setId(entryId);
-            entries.add(entry);
-        }
-
+        final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+        final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
+        String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
+        entry.setReceiptHandle(receiptHandle);
+        String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
+        entry.setId(entryId);
+        entries.add(entry);
         request.setEntries(entries);
 
         try {
-            client.deleteMessageBatch(request);
-            getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
-            session.transfer(flowFiles, REL_SUCCESS);
-        } catch (final Exception e) {
-            getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
-            final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
-            for (final FlowFile flowFile : flowFiles) {
-                penalizedFlowFiles.add(session.penalize(flowFile));
+            DeleteMessageBatchResult response = client.deleteMessageBatch(request);
+
+            // check for errors
+            if (!response.getFailed().isEmpty()) {
+                throw new ProcessException(response.getFailed().get(0).toString());
             }
-            session.transfer(penalizedFlowFiles, REL_FAILURE);
+
+            getLogger().info("Successfully deleted message from SQS for {}", new Object[] { flowFile });
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final Exception e) {
+            getLogger().error("Failed to delete message from SQS due to {}", new Object[] { e });
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index cfa32b4..35dce8c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -40,12 +40,14 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
 import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.SendMessageBatchResult;
 
 @SupportsBatching
 @SeeAlso({ GetSQS.class, DeleteSQS.class })
@@ -135,7 +137,12 @@ public class PutSQS extends AbstractSQSProcessor {
         request.setEntries(entries);
 
         try {
-            client.sendMessageBatch(request);
+            SendMessageBatchResult response = client.sendMessageBatch(request);
+
+            // check for errors
+            if (!response.getFailed().isEmpty()) {
+                throw new ProcessException(response.getFailed().get(0).toString());
+            }
         } catch (final Exception e) {
             getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
             flowFile = session.penalize(flowFile);
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
index feb0075..2bd769e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,6 +26,7 @@ import org.apache.nifi.util.TestRunners;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.AmazonSQSException;
 import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -38,15 +40,17 @@ public class TestDeleteSQS {
 
     private TestRunner runner = null;
     private DeleteSQS mockDeleteSQS = null;
-    private AmazonSQSClient actualSQSClient = null;
     private AmazonSQSClient mockSQSClient = null;
 
     @Before
     public void setUp() {
         mockSQSClient = Mockito.mock(AmazonSQSClient.class);
+        DeleteMessageBatchResult mockResponse = Mockito.mock(DeleteMessageBatchResult.class);
+        Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse);
+        Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>());
         mockDeleteSQS = new DeleteSQS() {
+            @Override
             protected AmazonSQSClient getClient() {
-                actualSQSClient = client;
                 return mockSQSClient;
             }
         };
diff --git a/nifi-nar-bundles/nifi-aws-bundle/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/pom.xml
index ad065fd..fbdfa9b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/pom.xml
@@ -26,7 +26,7 @@
     <packaging>pom</packaging>
 
     <properties>
-        <aws-java-sdk-version>1.11.599</aws-java-sdk-version>
+        <aws-java-sdk-version>1.11.677</aws-java-sdk-version>
     </properties>
 
     <dependencyManagement>