You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/21 16:25:10 UTC
[airflow] branch main updated: SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 47b72056c4 SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)
47b72056c4 is described below
commit 47b72056c46931aef09d63d6d80fbdd3d9128b09
Author: Daniel Barrundia Gonzalez <db...@gmail.com>
AuthorDate: Thu Jul 21 10:25:03 2022 -0600
SQSPublishOperator should allow sending messages to a FIFO Queue (#25171)
---
airflow/providers/amazon/aws/hooks/sqs.py | 19 +++++++++-----
airflow/providers/amazon/aws/operators/sqs.py | 13 +++++++++-
tests/providers/amazon/aws/operators/test_sqs.py | 33 ++++++++++++++++++++++++
3 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py
index b94756f63a..d3bcd837c9 100644
--- a/airflow/providers/amazon/aws/hooks/sqs.py
+++ b/airflow/providers/amazon/aws/hooks/sqs.py
@@ -58,6 +58,7 @@ class SqsHook(AwsBaseHook):
message_body: str,
delay_seconds: int = 0,
message_attributes: Optional[Dict] = None,
+ message_group_id: Optional[str] = None,
) -> Dict:
"""
Send message to the queue
@@ -67,17 +68,23 @@ class SqsHook(AwsBaseHook):
:param delay_seconds: seconds to delay the message
:param message_attributes: additional attributes for the message (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
+ :param message_group_id: This applies only to FIFO (first-in-first-out) queues. (default: None)
+ For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:return: dict with the information about the message sent
For details of the returned value see :py:meth:`botocore.client.SQS.send_message`
:rtype: dict
"""
- return self.get_conn().send_message(
- QueueUrl=queue_url,
- MessageBody=message_body,
- DelaySeconds=delay_seconds,
- MessageAttributes=message_attributes or {},
- )
+ params = {
+ 'QueueUrl': queue_url,
+ 'MessageBody': message_body,
+ 'DelaySeconds': delay_seconds,
+ 'MessageAttributes': message_attributes or {},
+ }
+ if message_group_id:
+ params['MessageGroupId'] = message_group_id
+
+ return self.get_conn().send_message(**params)
class SQSHook(SqsHook):
diff --git a/airflow/providers/amazon/aws/operators/sqs.py b/airflow/providers/amazon/aws/operators/sqs.py
index 6eff54134a..c9874c4d1e 100644
--- a/airflow/providers/amazon/aws/operators/sqs.py
+++ b/airflow/providers/amazon/aws/operators/sqs.py
@@ -39,10 +39,18 @@ class SqsPublishOperator(BaseOperator):
:param message_attributes: additional attributes for the message (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:param delay_seconds: message delay (templated) (default: 1 second)
+ :param message_group_id: This parameter applies only to FIFO (first-in-first-out) queues. (default: None)
+ For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:param aws_conn_id: AWS connection id (default: aws_default)
"""
- template_fields: Sequence[str] = ('sqs_queue', 'message_content', 'delay_seconds', 'message_attributes')
+ template_fields: Sequence[str] = (
+ 'sqs_queue',
+ 'message_content',
+ 'delay_seconds',
+ 'message_attributes',
+ 'message_group_id',
+ )
template_fields_renderers = {'message_attributes': 'json'}
ui_color = '#6ad3fa'
@@ -53,6 +61,7 @@ class SqsPublishOperator(BaseOperator):
message_content: str,
message_attributes: Optional[dict] = None,
delay_seconds: int = 0,
+ message_group_id: Optional[str] = None,
aws_conn_id: str = 'aws_default',
**kwargs,
):
@@ -62,6 +71,7 @@ class SqsPublishOperator(BaseOperator):
self.message_content = message_content
self.delay_seconds = delay_seconds
self.message_attributes = message_attributes or {}
+ self.message_group_id = message_group_id
def execute(self, context: 'Context'):
"""
@@ -79,6 +89,7 @@ class SqsPublishOperator(BaseOperator):
message_body=self.message_content,
delay_seconds=self.delay_seconds,
message_attributes=self.message_attributes,
+ message_group_id=self.message_group_id,
)
self.log.info('send_message result: %s', result)
diff --git a/tests/providers/amazon/aws/operators/test_sqs.py b/tests/providers/amazon/aws/operators/test_sqs.py
index 915fca9bfb..1ce152bbf1 100644
--- a/tests/providers/amazon/aws/operators/test_sqs.py
+++ b/tests/providers/amazon/aws/operators/test_sqs.py
@@ -20,6 +20,8 @@
import unittest
from unittest.mock import MagicMock
+import pytest
+from botocore.exceptions import ClientError
from moto import mock_sqs
from airflow.models.dag import DAG
@@ -32,6 +34,9 @@ DEFAULT_DATE = timezone.datetime(2019, 1, 1)
QUEUE_NAME = 'test-queue'
QUEUE_URL = f'https://{QUEUE_NAME}'
+FIFO_QUEUE_NAME = 'test-queue.fifo'
+FIFO_QUEUE_URL = f'https://{FIFO_QUEUE_NAME}'
+
class TestSqsPublishOperator(unittest.TestCase):
def setUp(self):
@@ -66,3 +71,31 @@ class TestSqsPublishOperator(unittest.TestCase):
context_calls = []
assert self.mock_context['ti'].method_calls == context_calls, "context call should be same"
+
+ @mock_sqs
+ def test_execute_failure_fifo_queue(self):
+ self.operator.sqs_queue = FIFO_QUEUE_URL
+ self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
+ with pytest.raises(ClientError) as ctx:
+ self.operator.execute(self.mock_context)
+ err_msg = (
+ "An error occurred (MissingParameter) when calling the SendMessage operation: The request must "
+ "contain the parameter MessageGroupId."
+ )
+ assert err_msg == str(ctx.value)
+
+ @mock_sqs
+ def test_execute_success_fifo_queue(self):
+ self.operator.sqs_queue = FIFO_QUEUE_URL
+ self.operator.message_group_id = "abc"
+ self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
+ result = self.operator.execute(self.mock_context)
+ assert 'MD5OfMessageBody' in result
+ assert 'MessageId' in result
+ message = self.sqs_hook.get_conn().receive_message(
+ QueueUrl=FIFO_QUEUE_URL, AttributeNames=['MessageGroupId']
+ )
+ assert len(message['Messages']) == 1
+ assert message['Messages'][0]['MessageId'] == result['MessageId']
+ assert message['Messages'][0]['Body'] == 'hello'
+ assert message['Messages'][0]['Attributes']['MessageGroupId'] == 'abc'