You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/08/28 23:51:43 UTC
[nifi] branch master updated: NIFI-6367 - This closes #3563. more
error handling for FetchS3Object
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e2ca50e NIFI-6367 - This closes #3563. more error handling for FetchS3Object
e2ca50e is described below
commit e2ca50e66a3b1a7d810ea8eac256d21bca3fd07f
Author: Evan Reynolds <ev...@usermind.com>
AuthorDate: Mon Jul 1 17:26:05 2019 -0700
NIFI-6367 - This closes #3563. more error handling for FetchS3Object
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../nifi/processors/aws/s3/FetchS3Object.java | 13 ++++++++++
.../nifi/processors/aws/s3/TestFetchS3Object.java | 30 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 8dabeb9..aa6233d 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.AmazonClientException;
@@ -129,6 +131,9 @@ public class FetchS3Object extends AbstractS3Processor {
}
try (final S3Object s3Object = client.getObject(request)) {
+ if (s3Object == null) {
+ throw new IOException("AWS refused to execute this request.");
+ }
flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
attributes.put("s3.bucket", s3Object.getBucketName());
@@ -174,6 +179,14 @@ public class FetchS3Object extends AbstractS3Processor {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
+ } catch (final FlowFileAccessException ffae) {
+ if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
+ getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ throw ffae;
}
if (!attributes.isEmpty()) {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 6416d53..ed3fdfe 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -235,6 +236,35 @@ public class TestFetchS3Object {
}
@Test
+ public void testGetObjectReturnsNull() throws IOException {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+ Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
+
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testFlowFileAccessExceptionGoesToFailure() throws IOException {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ AmazonS3Exception amazonException = new AmazonS3Exception("testing");
+ Mockito.doThrow(new FlowFileAccessException("testing nested", amazonException)).when(mockS3Client).getObject(Mockito.any());
+
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
+ }
+ @Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();