You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/08/28 23:51:43 UTC

[nifi] branch master updated: NIFI-6367 - This closes #3563. more error handling for FetchS3Object

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ca50e  NIFI-6367 - This closes #3563. more error handling for FetchS3Object
e2ca50e is described below

commit e2ca50e66a3b1a7d810ea8eac256d21bca3fd07f
Author: Evan Reynolds <ev...@usermind.com>
AuthorDate: Mon Jul 1 17:26:05 2019 -0700

    NIFI-6367 - This closes #3563. more error handling for FetchS3Object
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../nifi/processors/aws/s3/FetchS3Object.java      | 13 ++++++++++
 .../nifi/processors/aws/s3/TestFetchS3Object.java  | 30 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 8dabeb9..aa6233d 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.AmazonClientException;
@@ -129,6 +131,9 @@ public class FetchS3Object extends AbstractS3Processor {
         }
 
         try (final S3Object s3Object = client.getObject(request)) {
+            if (s3Object == null) {
+                throw new IOException("AWS refused to execute this request.");
+            }
             flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
             attributes.put("s3.bucket", s3Object.getBucketName());
 
@@ -174,6 +179,14 @@ public class FetchS3Object extends AbstractS3Processor {
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
+        } catch (final FlowFileAccessException ffae) {
+            if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
+                getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+            throw ffae;
         }
 
         if (!attributes.isEmpty()) {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 6416d53..ed3fdfe 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -235,6 +236,35 @@ public class TestFetchS3Object {
     }
 
     @Test
+    public void testGetObjectReturnsNull() throws IOException {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+        Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testFlowFileAccessExceptionGoesToFailure() throws IOException {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        AmazonS3Exception amazonException = new AmazonS3Exception("testing");
+        Mockito.doThrow(new FlowFileAccessException("testing nested", amazonException)).when(mockS3Client).getObject(Mockito.any());
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
+    }
+    @Test
     public void testGetPropertyDescriptors() throws Exception {
         FetchS3Object processor = new FetchS3Object();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();