You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/09/18 13:08:10 UTC
[nifi] branch main updated: NIFI-8087 Adds MessageGroupID and
MessageDuplicationId to PutSNS Updated topic arn assert to match dummy
topic name with .fifo suffix
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 50e60c8 NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS Updated topic arn assert to match dummy topic name with .fifo suffix
50e60c8 is described below
commit 50e60c812d6b73137a1ffb178c4c903f66203f49
Author: Alasdair Brown <me...@alasdairb.com>
AuthorDate: Sat Sep 4 14:12:49 2021 +0100
NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS
Updated topic arn assert to match dummy topic name with .fifo suffix
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5367.
---
.../org/apache/nifi/processors/aws/sns/PutSNS.java | 33 +++++++++++++++++++-
.../apache/nifi/processors/aws/sns/TestPutSNS.java | 35 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 2acc38b..f4c3c04 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -75,9 +75,28 @@ public class PutSNS extends AbstractSNSProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder()
+ .name("Message Group ID")
+ .displayName("Message Group ID")
+ .description("If using FIFO, the message group to which the flowFile belongs")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder()
+ .name("Deduplication Message ID")
+ .displayName("Deduplication Message ID")
+ .description("The token used for deduplication of sent messages")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
- USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+ USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
+ MESSAGEGROUPID, MESSAGEDEDUPLICATIONID));
public static final int MAX_SIZE = 256 * 1024;
@@ -120,6 +139,18 @@ public class PutSNS extends AbstractSNSProcessor {
final PublishRequest request = new PublishRequest();
request.setMessage(message);
+ if (context.getProperty(MESSAGEGROUPID).isSet()) {
+ request.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue());
+ }
+
+ if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
+ request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue());
+ }
+
if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
request.setMessageStructure("json");
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
index 33eba23..25cbde4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
@@ -90,6 +90,41 @@ public class TestPutSNS {
}
@Test
+ public void testPublishFIFO() throws IOException {
+ runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
+ runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo");
+ runner.setProperty(PutSNS.SUBJECT, "${eval.subject}");
+ runner.setProperty(PutSNS.MESSAGEDEDUPLICATIONID, "${myuuid}");
+ runner.setProperty(PutSNS.MESSAGEGROUPID, "test1234");
+ assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
+ final Map<String, String> ffAttributes = new HashMap<>();
+ ffAttributes.put("filename", "1.txt");
+ ffAttributes.put("eval.subject", "test-subject");
+ ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
+ runner.enqueue("Test Message Content", ffAttributes);
+
+ PublishResult mockPublishResult = new PublishResult();
+ Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
+
+ runner.run();
+
+ ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
+ Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
+ PublishRequest request = captureRequest.getValue();
+ assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn());
+ assertEquals("Test Message Content", request.getMessage());
+ assertEquals("test-subject", request.getSubject());
+ assertEquals("test1234", request.getMessageGroupId());
+ assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId());
+ assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
+
+ runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt");
+ }
+
+ @Test
public void testPublishFailure() throws IOException {
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");
final Map<String, String> ffAttributes = new HashMap<>();