You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/12/22 04:35:54 UTC

[nifi] branch main updated: NIFI-7371 Added FlowFile attributes for exceptions in S3 processors

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d49371  NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
0d49371 is described below

commit 0d49371c53e1a6da80fdb33ecfbad13eb5b8cad1
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Wed Dec 15 18:54:28 2021 -0500

    NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
    
    This closes #5606
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processors/aws/s3/AbstractS3Processor.java     | 21 +++++++
 .../nifi/processors/aws/s3/DeleteS3Object.java     | 13 +++-
 .../nifi/processors/aws/s3/FetchS3Object.java      | 11 +++-
 .../apache/nifi/processors/aws/s3/PutS3Object.java |  6 ++
 .../apache/nifi/processors/aws/s3/TagS3Object.java | 10 +++-
 .../nifi/processors/aws/s3/TestFetchS3Object.java  | 70 ++++++++++++++++++++++
 6 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 4bd0769..dc29b78 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -24,6 +25,7 @@ import com.amazonaws.regions.Region;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CanonicalGrantee;
 import com.amazonaws.services.s3.model.EmailAddressGrantee;
@@ -36,6 +38,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 
@@ -315,6 +318,24 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
         return acl;
     }
 
+    protected FlowFile extractExceptionDetails(final Exception e, final ProcessSession session, FlowFile flowFile) {
+        flowFile = session.putAttribute(flowFile, "s3.exception", e.getClass().getName());
+        if (e instanceof AmazonS3Exception) {
+            flowFile = putAttribute(session, flowFile, "s3.additionalDetails", ((AmazonS3Exception) e).getAdditionalDetails());
+        }
+        if (e instanceof AmazonServiceException) {
+            final AmazonServiceException ase = (AmazonServiceException) e;
+            flowFile = putAttribute(session, flowFile, "s3.statusCode", ase.getStatusCode());
+            flowFile = putAttribute(session, flowFile, "s3.errorCode", ase.getErrorCode());
+            flowFile = putAttribute(session, flowFile, "s3.errorMessage", ase.getErrorMessage());
+        }
+        return flowFile;
+    }
+
+    private FlowFile putAttribute(final ProcessSession session, final FlowFile flowFile, final String key, final Object value) {
+        return (value == null) ? flowFile : session.putAttribute(flowFile, key, value.toString());
+    }
+
     /**
      * Create CannedAccessControlList if {@link #CANNED_ACL} property specified.
      *
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7d2cce9..ff4249b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.s3.model.DeleteVersionRequest;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -41,6 +43,12 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 
 @SupportsBatching
+@WritesAttributes({
+        @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+        @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+        @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+        @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+        @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
 @SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -92,7 +100,8 @@ public class DeleteS3Object extends AbstractS3Processor {
                 s3.deleteVersion(r);
             }
         } catch (final AmazonServiceException ase) {
-            getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+            flowFile = extractExceptionDetails(ase, session, flowFile);
+            getLogger().error("Failed to delete S3 Object for {}; routing to failure", flowFile, ase);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
@@ -100,6 +109,6 @@ public class DeleteS3Object extends AbstractS3Processor {
 
         session.transfer(flowFile, REL_SUCCESS);
         final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
+        getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", flowFile, transferMillis);
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index b9de5fe..f639b54 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -72,6 +72,11 @@ import java.util.concurrent.TimeUnit;
     @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
     @WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
+    @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+    @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+    @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+    @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+    @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
     @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
     @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
     @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@@ -251,13 +256,15 @@ public class FetchS3Object extends AbstractS3Processor {
                 attributes.put("s3.version", metadata.getVersionId());
             }
         } catch (final IOException | AmazonClientException ioe) {
-            getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe});
+            flowFile = extractExceptionDetails(ioe, session, flowFile);
+            getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ioe);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         } catch (final FlowFileAccessException ffae) {
             if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
-                getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
+                getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ffae);
+                flowFile = extractExceptionDetails(ffae, session, flowFile);
                 flowFile = session.penalize(flowFile);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 0fed5ee..40f086c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -117,6 +117,11 @@ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
     @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
     @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
+    @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+    @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+    @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+    @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+    @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
     @WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
@@ -833,6 +838,7 @@ public class PutS3Object extends AbstractS3Processor {
                         new Object[]{cacheKey, e.getMessage()});
             }
         } catch (final ProcessException | AmazonClientException pe) {
+            extractExceptionDetails(pe, session, flowFile);
             if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
                 getLogger().info(pe.getMessage());
                 session.rollback();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index 6c9d72a..2f01598 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -53,7 +53,12 @@ import java.util.stream.Collectors;
 @SupportsBatching
 @WritesAttributes({
         @WritesAttribute(attribute = "s3.tag.___", description = "The tags associated with the S3 object will be " +
-                "written as part of the FlowFile attributes")})
+                "written as part of the FlowFile attributes"),
+        @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+        @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+        @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+        @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+        @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
 @SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -171,7 +176,8 @@ public class TagS3Object extends AbstractS3Processor {
             }
             s3.setObjectTagging(r);
         } catch (final AmazonServiceException ase) {
-            getLogger().error("Failed to tag S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+            flowFile = extractExceptionDetails(ase, session, flowFile);
+            getLogger().error("Failed to tag S3 Object for {}; routing to failure", flowFile, ase);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 71915a1..cf005c5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -17,11 +17,14 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.amazonaws.SdkClientException;
+import com.google.common.collect.ImmutableMap;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -256,6 +259,72 @@ public class TestFetchS3Object {
     }
 
     @Test
+    public void testFetchObject_FailAdditionalAttributesBucketName() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final AmazonS3Exception exception = new AmazonS3Exception("The specified bucket does not exist");
+        exception.setAdditionalDetails(ImmutableMap.of("BucketName", "us-east-1", "Error", "ABC123"));
+        exception.setErrorCode("NoSuchBucket");
+        exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals("NoSuchBucket", flowFile.getAttribute("s3.errorCode"));
+        assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+        assertEquals("404", flowFile.getAttribute("s3.statusCode"));
+        assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
+    public void testFetchObject_FailAdditionalAttributesAuthentication() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final AmazonS3Exception exception = new AmazonS3Exception("signature");
+        exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB CC DD EE FF"));
+        exception.setErrorCode("SignatureDoesNotMatch");
+        exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals("SignatureDoesNotMatch", flowFile.getAttribute("s3.errorCode"));
+        assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+        assertEquals("403", flowFile.getAttribute("s3.statusCode"));
+        assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
+    public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final SdkClientException exception = new SdkClientException("message");
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
     public void testGetObjectReturnsNull() throws IOException {
         runner.setProperty(FetchS3Object.REGION, "us-east-1");
         runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
@@ -284,6 +353,7 @@ public class TestFetchS3Object {
 
         runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
     }
+
     @Test
     public void testGetPropertyDescriptors() throws Exception {
         FetchS3Object processor = new FetchS3Object();