You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/09/18 13:08:10 UTC

[nifi] branch main updated: NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS Updated topic arn assert to match dummy topic name with .fifo suffix

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 50e60c8  NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS Updated topic arn assert to match dummy topic name with .fifo suffix
50e60c8 is described below

commit 50e60c812d6b73137a1ffb178c4c903f66203f49
Author: Alasdair Brown <me...@alasdairb.com>
AuthorDate: Sat Sep 4 14:12:49 2021 +0100

    NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS
    Updated topic arn assert to match dummy topic name with .fifo suffix
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5367.
---
 .../org/apache/nifi/processors/aws/sns/PutSNS.java | 33 +++++++++++++++++++-
 .../apache/nifi/processors/aws/sns/TestPutSNS.java | 35 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 2acc38b..f4c3c04 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -75,9 +75,28 @@ public class PutSNS extends AbstractSNSProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder()
+            .name("Message Group ID")
+            .displayName("Message Group ID")
+            .description("If using FIFO, the message group to which the flowFile belongs")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder()
+            .name("Deduplication Message ID")
+            .displayName("Deduplication Message ID")
+            .description("The token used for deduplication of sent messages")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
             Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
-                    USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+                    USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
+                    MESSAGEGROUPID, MESSAGEDEDUPLICATIONID));
 
     public static final int MAX_SIZE = 256 * 1024;
 
@@ -120,6 +139,18 @@ public class PutSNS extends AbstractSNSProcessor {
         final PublishRequest request = new PublishRequest();
         request.setMessage(message);
 
+        if (context.getProperty(MESSAGEGROUPID).isSet()) {
+            request.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
+                    .evaluateAttributeExpressions(flowFile)
+                    .getValue());
+        }
+
+        if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
+            request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
+                    .evaluateAttributeExpressions(flowFile)
+                    .getValue());
+        }
+
         if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
             request.setMessageStructure("json");
         }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
index 33eba23..25cbde4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java
@@ -90,6 +90,41 @@ public class TestPutSNS {
     }
 
     @Test
+    public void testPublishFIFO() throws IOException {
+        runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
+        runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo");
+        runner.setProperty(PutSNS.SUBJECT, "${eval.subject}");
+        runner.setProperty(PutSNS.MESSAGEDEDUPLICATIONID, "${myuuid}");
+        runner.setProperty(PutSNS.MESSAGEGROUPID, "test1234");
+        assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
+        final Map<String, String> ffAttributes = new HashMap<>();
+        ffAttributes.put("filename", "1.txt");
+        ffAttributes.put("eval.subject", "test-subject");
+        ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
+        runner.enqueue("Test Message Content", ffAttributes);
+
+        PublishResult mockPublishResult = new PublishResult();
+        Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
+
+        runner.run();
+
+        ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
+        Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
+        PublishRequest request = captureRequest.getValue();
+        assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn());
+        assertEquals("Test Message Content", request.getMessage());
+        assertEquals("test-subject", request.getSubject());
+        assertEquals("test1234", request.getMessageGroupId());
+        assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId());
+        assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
+
+        runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt");
+    }
+
+    @Test
     public void testPublishFailure() throws IOException {
         runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");
         final Map<String, String> ffAttributes = new HashMap<>();