You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 16:25:10 UTC

[airflow] branch main updated: SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 47b72056c4 SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)
47b72056c4 is described below

commit 47b72056c46931aef09d63d6d80fbdd3d9128b09
Author: Daniel Barrundia Gonzalez <db...@gmail.com>
AuthorDate: Thu Jul 21 10:25:03 2022 -0600

    SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)
---
 airflow/providers/amazon/aws/hooks/sqs.py        | 19 +++++++++-----
 airflow/providers/amazon/aws/operators/sqs.py    | 13 +++++++++-
 tests/providers/amazon/aws/operators/test_sqs.py | 33 ++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py
index b94756f63a..d3bcd837c9 100644
--- a/airflow/providers/amazon/aws/hooks/sqs.py
+++ b/airflow/providers/amazon/aws/hooks/sqs.py
@@ -58,6 +58,7 @@ class SqsHook(AwsBaseHook):
         message_body: str,
         delay_seconds: int = 0,
         message_attributes: Optional[Dict] = None,
+        message_group_id: Optional[str] = None,
     ) -> Dict:
         """
         Send message to the queue
@@ -67,17 +68,23 @@ class SqsHook(AwsBaseHook):
         :param delay_seconds: seconds to delay the message
         :param message_attributes: additional attributes for the message (default: None)
             For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
+        :param message_group_id: This applies only to FIFO (first-in-first-out) queues. (default: None)
+            For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
 
         :return: dict with the information about the message sent
             For details of the returned value see :py:meth:`botocore.client.SQS.send_message`
         :rtype: dict
         """
-        return self.get_conn().send_message(
-            QueueUrl=queue_url,
-            MessageBody=message_body,
-            DelaySeconds=delay_seconds,
-            MessageAttributes=message_attributes or {},
-        )
+        params = {
+            'QueueUrl': queue_url,
+            'MessageBody': message_body,
+            'DelaySeconds': delay_seconds,
+            'MessageAttributes': message_attributes or {},
+        }
+        if message_group_id:
+            params['MessageGroupId'] = message_group_id
+
+        return self.get_conn().send_message(**params)
 
 
 class SQSHook(SqsHook):
diff --git a/airflow/providers/amazon/aws/operators/sqs.py b/airflow/providers/amazon/aws/operators/sqs.py
index 6eff54134a..c9874c4d1e 100644
--- a/airflow/providers/amazon/aws/operators/sqs.py
+++ b/airflow/providers/amazon/aws/operators/sqs.py
@@ -39,10 +39,18 @@ class SqsPublishOperator(BaseOperator):
     :param message_attributes: additional attributes for the message (default: None)
         For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
     :param delay_seconds: message delay (templated) (default: 1 second)
+    :param message_group_id: This parameter applies only to FIFO (first-in-first-out) queues. (default: None)
+        For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
     :param aws_conn_id: AWS connection id (default: aws_default)
     """
 
-    template_fields: Sequence[str] = ('sqs_queue', 'message_content', 'delay_seconds', 'message_attributes')
+    template_fields: Sequence[str] = (
+        'sqs_queue',
+        'message_content',
+        'delay_seconds',
+        'message_attributes',
+        'message_group_id',
+    )
     template_fields_renderers = {'message_attributes': 'json'}
     ui_color = '#6ad3fa'
 
@@ -53,6 +61,7 @@ class SqsPublishOperator(BaseOperator):
         message_content: str,
         message_attributes: Optional[dict] = None,
         delay_seconds: int = 0,
+        message_group_id: Optional[str] = None,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
@@ -62,6 +71,7 @@ class SqsPublishOperator(BaseOperator):
         self.message_content = message_content
         self.delay_seconds = delay_seconds
         self.message_attributes = message_attributes or {}
+        self.message_group_id = message_group_id
 
     def execute(self, context: 'Context'):
         """
@@ -79,6 +89,7 @@ class SqsPublishOperator(BaseOperator):
             message_body=self.message_content,
             delay_seconds=self.delay_seconds,
             message_attributes=self.message_attributes,
+            message_group_id=self.message_group_id,
         )
 
         self.log.info('send_message result: %s', result)
diff --git a/tests/providers/amazon/aws/operators/test_sqs.py b/tests/providers/amazon/aws/operators/test_sqs.py
index 915fca9bfb..1ce152bbf1 100644
--- a/tests/providers/amazon/aws/operators/test_sqs.py
+++ b/tests/providers/amazon/aws/operators/test_sqs.py
@@ -20,6 +20,8 @@
 import unittest
 from unittest.mock import MagicMock
 
+import pytest
+from botocore.exceptions import ClientError
 from moto import mock_sqs
 
 from airflow.models.dag import DAG
@@ -32,6 +34,9 @@ DEFAULT_DATE = timezone.datetime(2019, 1, 1)
 QUEUE_NAME = 'test-queue'
 QUEUE_URL = f'https://{QUEUE_NAME}'
 
+FIFO_QUEUE_NAME = 'test-queue.fifo'
+FIFO_QUEUE_URL = f'https://{FIFO_QUEUE_NAME}'
+
 
 class TestSqsPublishOperator(unittest.TestCase):
     def setUp(self):
@@ -66,3 +71,31 @@ class TestSqsPublishOperator(unittest.TestCase):
         context_calls = []
 
         assert self.mock_context['ti'].method_calls == context_calls, "context call  should be same"
+
+    @mock_sqs
+    def test_execute_failure_fifo_queue(self):
+        self.operator.sqs_queue = FIFO_QUEUE_URL
+        self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
+        with pytest.raises(ClientError) as ctx:
+            self.operator.execute(self.mock_context)
+        err_msg = (
+            "An error occurred (MissingParameter) when calling the SendMessage operation: The request must "
+            "contain the parameter MessageGroupId."
+        )
+        assert err_msg == str(ctx.value)
+
+    @mock_sqs
+    def test_execute_success_fifo_queue(self):
+        self.operator.sqs_queue = FIFO_QUEUE_URL
+        self.operator.message_group_id = "abc"
+        self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
+        result = self.operator.execute(self.mock_context)
+        assert 'MD5OfMessageBody' in result
+        assert 'MessageId' in result
+        message = self.sqs_hook.get_conn().receive_message(
+            QueueUrl=FIFO_QUEUE_URL, AttributeNames=['MessageGroupId']
+        )
+        assert len(message['Messages']) == 1
+        assert message['Messages'][0]['MessageId'] == result['MessageId']
+        assert message['Messages'][0]['Body'] == 'hello'
+        assert message['Messages'][0]['Attributes']['MessageGroupId'] == 'abc'