You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/14 02:46:19 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #16904: Improve AWS SQS Sensor (#16880)

uranusjr commented on a change in pull request #16904:
URL: https://github.com/apache/airflow/pull/16904#discussion_r669238131



##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))

Review comment:
       ```suggestion
           self.log.info("received %d messages", num_messages)
   ```
   
   The string conversion is redundant since the logger can accept int directly; passing the int also ensures the most context is retained, which can be useful for people using custom log handlers and formatters. (I know this was done in existing code; it is wrong.)
   
   Same for other logs below.

##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))
 
-        if 'Messages' in messages and messages['Messages']:
-            entries = [
-                {'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']}
-                for message in messages['Messages']
-            ]
+        if num_messages == 0:

Review comment:
       ```suggestion
           if not num_messages:
   ```
   
   Same for other checks below.

##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -69,31 +89,48 @@ def poke(self, context):
 
         self.log.info('SQSSensor checking for message on queue: %s', self.sqs_queue)
 
-        messages = sqs_conn.receive_message(
-            QueueUrl=self.sqs_queue,
-            MaxNumberOfMessages=self.max_messages,
-            WaitTimeSeconds=self.wait_time_seconds,
-        )
+        receive_message_kwargs = {
+            'QueueUrl': self.sqs_queue,
+            'MaxNumberOfMessages': self.max_messages,
+            'WaitTimeSeconds': self.wait_time_seconds,
+        }
+        if self.visibility_timeout is not None:
+            receive_message_kwargs['VisibilityTimeout'] = self.visibility_timeout
+
+        response = sqs_conn.receive_message(**receive_message_kwargs)
+
+        if "Messages" not in response:
+            return False
 
-        self.log.info("received message %s", str(messages))
+        messages = response['Messages']
+        num_messages = len(messages)
+        self.log.info("received %s messages", str(num_messages))
 
-        if 'Messages' in messages and messages['Messages']:
-            entries = [
-                {'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']}
-                for message in messages['Messages']
-            ]
+        if num_messages == 0:
+            return False
 
-            result = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)
+        if self.message_filtering:
+            messages = self.filter_messages(messages)
+            num_messages = len(messages)
+            self.log.info("filtered %s messages", str(num_messages))

Review comment:
       This message is quite ambiguous. Is the count indicating how many message are filtered (i.e. dropped), or how many are left after the filter is applied?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org