You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/12/22 04:35:54 UTC
[nifi] branch main updated: NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0d49371 NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
0d49371 is described below
commit 0d49371c53e1a6da80fdb33ecfbad13eb5b8cad1
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Wed Dec 15 18:54:28 2021 -0500
NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
This closes #5606
Signed-off-by: David Handermann <ex...@apache.org>
---
.../processors/aws/s3/AbstractS3Processor.java | 21 +++++++
.../nifi/processors/aws/s3/DeleteS3Object.java | 13 +++-
.../nifi/processors/aws/s3/FetchS3Object.java | 11 +++-
.../apache/nifi/processors/aws/s3/PutS3Object.java | 6 ++
.../apache/nifi/processors/aws/s3/TagS3Object.java | 10 +++-
.../nifi/processors/aws/s3/TestFetchS3Object.java | 70 ++++++++++++++++++++++
6 files changed, 125 insertions(+), 6 deletions(-)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 4bd0769..dc29b78 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;
+import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -24,6 +25,7 @@ import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.EmailAddressGrantee;
@@ -36,6 +38,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
@@ -315,6 +318,24 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
return acl;
}
+ protected FlowFile extractExceptionDetails(final Exception e, final ProcessSession session, FlowFile flowFile) {
+ flowFile = session.putAttribute(flowFile, "s3.exception", e.getClass().getName());
+ if (e instanceof AmazonS3Exception) {
+ flowFile = putAttribute(session, flowFile, "s3.additionalDetails", ((AmazonS3Exception) e).getAdditionalDetails());
+ }
+ if (e instanceof AmazonServiceException) {
+ final AmazonServiceException ase = (AmazonServiceException) e;
+ flowFile = putAttribute(session, flowFile, "s3.statusCode", ase.getStatusCode());
+ flowFile = putAttribute(session, flowFile, "s3.errorCode", ase.getErrorCode());
+ flowFile = putAttribute(session, flowFile, "s3.errorMessage", ase.getErrorMessage());
+ }
+ return flowFile;
+ }
+
+ private FlowFile putAttribute(final ProcessSession session, final FlowFile flowFile, final String key, final Object value) {
+ return (value == null) ? flowFile : session.putAttribute(flowFile, key, value.toString());
+ }
+
/**
* Create CannedAccessControlList if {@link #CANNED_ACL} property specified.
*
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7d2cce9..ff4249b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -41,6 +43,12 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
+@WritesAttributes({
+ @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -92,7 +100,8 @@ public class DeleteS3Object extends AbstractS3Processor {
s3.deleteVersion(r);
}
} catch (final AmazonServiceException ase) {
- getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+ flowFile = extractExceptionDetails(ase, session, flowFile);
+ getLogger().error("Failed to delete S3 Object for {}; routing to failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
@@ -100,6 +109,6 @@ public class DeleteS3Object extends AbstractS3Processor {
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
+ getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", flowFile, transferMillis);
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index b9de5fe..f639b54 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -72,6 +72,11 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
@WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@@ -251,13 +256,15 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.put("s3.version", metadata.getVersionId());
}
} catch (final IOException | AmazonClientException ioe) {
- getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe});
+ flowFile = extractExceptionDetails(ioe, session, flowFile);
+ getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ioe);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final FlowFileAccessException ffae) {
if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
- getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
+ getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ffae);
+ flowFile = extractExceptionDetails(ffae, session, flowFile);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 0fed5ee..40f086c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -117,6 +117,11 @@ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
@WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
@@ -833,6 +838,7 @@ public class PutS3Object extends AbstractS3Processor {
new Object[]{cacheKey, e.getMessage()});
}
} catch (final ProcessException | AmazonClientException pe) {
+ extractExceptionDetails(pe, session, flowFile);
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(pe.getMessage());
session.rollback();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index 6c9d72a..2f01598 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -53,7 +53,12 @@ import java.util.stream.Collectors;
@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = "s3.tag.___", description = "The tags associated with the S3 object will be " +
- "written as part of the FlowFile attributes")})
+ "written as part of the FlowFile attributes"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -171,7 +176,8 @@ public class TagS3Object extends AbstractS3Processor {
}
s3.setObjectTagging(r);
} catch (final AmazonServiceException ase) {
- getLogger().error("Failed to tag S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+ flowFile = extractExceptionDetails(ase, session, flowFile);
+ getLogger().error("Failed to tag S3 Object for {}; routing to failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 71915a1..cf005c5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -17,11 +17,14 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.amazonaws.SdkClientException;
+import com.google.common.collect.ImmutableMap;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -256,6 +259,72 @@ public class TestFetchS3Object {
}
@Test
+ public void testFetchObject_FailAdditionalAttributesBucketName() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final AmazonS3Exception exception = new AmazonS3Exception("The specified bucket does not exist");
+ exception.setAdditionalDetails(ImmutableMap.of("BucketName", "us-east-1", "Error", "ABC123"));
+ exception.setErrorCode("NoSuchBucket");
+ exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals("NoSuchBucket", flowFile.getAttribute("s3.errorCode"));
+ assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+ assertEquals("404", flowFile.getAttribute("s3.statusCode"));
+ assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
+ public void testFetchObject_FailAdditionalAttributesAuthentication() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final AmazonS3Exception exception = new AmazonS3Exception("signature");
+ exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB CC DD EE FF"));
+ exception.setErrorCode("SignatureDoesNotMatch");
+ exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals("SignatureDoesNotMatch", flowFile.getAttribute("s3.errorCode"));
+ assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+ assertEquals("403", flowFile.getAttribute("s3.statusCode"));
+ assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
+ public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final SdkClientException exception = new SdkClientException("message");
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
public void testGetObjectReturnsNull() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
@@ -284,6 +353,7 @@ public class TestFetchS3Object {
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
}
+
@Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();