You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/03 14:43:38 UTC

[GitHub] [airflow] ANiteckiP opened a new pull request #7613: [GCP Operators] Add PubSubPullOperator

ANiteckiP opened a new pull request #7613: [GCP Operators] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613
 
 
   Add PubSubPullOperator - a non-blocking equivalent to Add PubSubPullSensor.
   
   These two share most of functionality but ultimately serve different purpose - while existing PubSubPullSensor will block the pipeline until it receives a message, new PubSubPullOperator can be used to check for new messages opportunistically.
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387646253
 
 

 ##########
 File path: airflow/providers/google/cloud/hooks/pubsub.py
 ##########
 @@ -545,8 +549,20 @@ def acknowledge(
         :param metadata: (Optional) Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]]
         """
+
         if not project_id:
             raise ValueError("Project ID should be set.")
+
+        if ack_ids is not None and messages is None:
 
 Review comment:
   Same with Messages I think. The condition should be - either of the two should be not-empty rather than not None.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ANiteckiP commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
ANiteckiP commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r388240353
 
 

 ##########
 File path: airflow/providers/google/cloud/hooks/pubsub.py
 ##########
 @@ -545,8 +549,20 @@ def acknowledge(
         :param metadata: (Optional) Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]]
         """
+
         if not project_id:
             raise ValueError("Project ID should be set.")
+
+        if ack_ids is not None and messages is None:
 
 Review comment:
   The idea is that passing both of those arguments in function call is prohibited. Defaulting to empty list/tuple and using non-emptiness check rather than `is not None` would miss cases when users pass both arguments, one of which happens to be an empty collection, when they are not supposed to do that in the first place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387671628
 
 

 ##########
 File path: airflow/providers/google/cloud/sensors/pubsub.py
 ##########
 @@ -30,11 +31,16 @@
 
 class PubSubPullSensor(BaseSensorOperator):
     """Pulls messages from a PubSub subscription and passes them through XCom.
+    Always waits for at least one message to be returned from the subscription.
 
     .. seealso::
         For more information on how to use this operator, take a look at the guide:
         :ref:`howto/operator:PubSubPullSensor`
 
+    .. seealso::
+        If you don't want to wait for at least one message to come, use Operator instead:
 
 Review comment:
   Good comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387672702
 
 

 ##########
 File path: airflow/providers/google/cloud/sensors/pubsub.py
 ##########
 @@ -94,6 +114,18 @@ def __init__(
                 "the project_id parameter.", DeprecationWarning, stacklevel=2)
             project_id = project
 
+        if not return_immediately:
+            warnings.warn(
+                "The return_immediately parameter is deprecated.\n"
 
 Review comment:
   :+1: 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] turbaszek commented on issue #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#issuecomment-601611035
 
 
   Done in https://github.com/apache/airflow/pull/7766

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] turbaszek closed pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
turbaszek closed pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] michalslowikowski00 commented on issue #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on issue #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#issuecomment-595325977
 
 
   Will you squash all commits?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r388248783
 
 

 ##########
 File path: airflow/providers/google/cloud/hooks/pubsub.py
 ##########
 @@ -545,8 +549,20 @@ def acknowledge(
         :param metadata: (Optional) Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]]
         """
+
         if not project_id:
             raise ValueError("Project ID should be set.")
+
+        if ack_ids is not None and messages is None:
 
 Review comment:
   Yep. I understand it - so you are saying that passing an empty list is OK. How then it should behave if you pass empty ack_id or messages ? I think this should fail as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387645733
 
 

 ##########
 File path: airflow/providers/google/cloud/hooks/pubsub.py
 ##########
 @@ -545,8 +549,20 @@ def acknowledge(
         :param metadata: (Optional) Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]]
         """
+
         if not project_id:
             raise ValueError("Project ID should be set.")
+
+        if ack_ids is not None and messages is None:
 
 Review comment:
   What if ack_ids is [] ? Should we allow this case or should it be treated as None?  I have slight preference to treat is as None (the code will be simpler (without is not None) and we prevent the user from making the mistake of empty id list. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ANiteckiP commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
ANiteckiP commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r388363326
 
 

 ##########
 File path: airflow/providers/google/cloud/hooks/pubsub.py
 ##########
 @@ -545,8 +549,20 @@ def acknowledge(
         :param metadata: (Optional) Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]]
         """
+
         if not project_id:
             raise ValueError("Project ID should be set.")
+
+        if ack_ids is not None and messages is None:
 
 Review comment:
   I think we should allow passing an empty list, because the `pull` method can return an empty list as well - `acknowledge` should be able to gracefully accept any valid `pull` return value. We could check for empty list and return early, or just pass the list to the underlying API and expose its behaviour without additional checks (as it is now).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387672039
 
 

 ##########
 File path: airflow/providers/google/cloud/sensors/pubsub.py
 ##########
 @@ -55,8 +61,15 @@ class PubSubPullSensor(BaseSensorOperator):
     :param max_messages: The maximum number of messages to retrieve per
         PubSub pull request
     :type max_messages: int
-    :param return_immediately: If True, instruct the PubSub API to return
-        immediately if no messages are available for delivery.
+    :param return_immediately:
 
 Review comment:
   Here too! I love the detailed explanations!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #7613: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r387671400
 
 

 ##########
 File path: airflow/providers/google/cloud/operators/pubsub.py
 ##########
 @@ -674,3 +675,124 @@ def execute(self, context):
         self.log.info("Publishing to topic %s", self.topic)
         hook.publish(project_id=self.project_id, topic=self.topic, messages=self.messages)
         self.log.info("Published to topic %s", self.topic)
+
+
+class PubSubPullOperator(BaseOperator):
+    """Pulls messages from a PubSub subscription and passes them through XCom.
+    If the queue is empty, returns empty list - never waits for messages.
+        If you do need to wait, please use
+        :class:`airflow.providers.google.cloud.sensors.PubSubPullSensor`
+        instead.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:PubSubPullSensor`
+
+    This sensor operator will pull up to ``max_messages`` messages from the
+    specified PubSub subscription. When the subscription returns messages,
+    the poke method's criteria will be fulfilled and the messages will be
+    returned from the operator and passed through XCom for downstream tasks.
+
+    If ``ack_messages`` is set to True, messages will be immediately
+    acknowledged before being returned, otherwise, downstream tasks will be
+    responsible for acknowledging them.
+
+    ``project`` and ``subscription`` are templated so you can use
+    variables in them.
+
+    :param project: the GCP project ID for the subscription (templated)
+    :type project: str
+    :param subscription: the Pub/Sub subscription name. Do not include the
+        full subscription path.
+    :type subscription: str
+    :param max_messages: The maximum number of messages to retrieve per
+        PubSub pull request
+    :type max_messages: int
+    :param ack_messages: If True, each message will be acknowledged
+        immediately rather than by any downstream tasks
+    :type ack_messages: bool
+    :param gcp_conn_id: The connection ID to use connecting to
+        Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request
+        must have domain-wide delegation enabled.
+    :type delegate_to: str
+    :param messages_callback: (Optional) Callback to process received messages.
+        It's return value will be saved to XCom.
+        If you are pulling large messages, you probably want to provide a custom callback.
+        If not provided, the default implementation will convert `ReceivedMessage` objects
+            into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function.
+    :type messages_callback: Optional[Callable[[List[ReceivedMessage], Dict[str, Any]], Any]]
+    """
+    template_fields = ['project_id', 'subscription']
+
+    @apply_defaults
+    def __init__(
+            self,
+            project_id: str,
+            subscription: str,
+            max_messages: int = 5,
+            ack_messages: bool = False,
+            messages_callback: Optional[Callable[[List[ReceivedMessage], Dict[str, Any]], Any]] = None,
+            gcp_conn_id: str = 'google_cloud_default',
+            delegate_to: Optional[str] = None,
+            *args,
+            **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.project_id = project_id
+        self.subscription = subscription
+        self.max_messages = max_messages
+        self.ack_messages = ack_messages
+        self.messages_callback = messages_callback
+
+    def execute(self, context):
+        hook = PubSubHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+        )
+
+        pulled_messages = hook.pull(
+            project_id=self.project_id,
+            subscription=self.subscription,
+            max_messages=self.max_messages,
+            return_immediately=True,
+        )
+
+        handle_messages = self.messages_callback or self._default_message_callback
+
+        ret = handle_messages(pulled_messages, context)
+
+        if pulled_messages and self.ack_messages:
+            hook.acknowledge(
+                project_id=self.project_id,
+                subscription=self.subscription,
+                messages=pulled_messages,
+            )
+
+        return ret
+
+    def _default_message_callback(
 
 Review comment:
   Nice!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r389243564
 
 

 ##########
 File path: tests/providers/google/cloud/hooks/test_pubsub.py
 ##########
 @@ -58,6 +60,21 @@ def setUp(self):
                         new=mock_init):
             self.pubsub_hook = PubSubHook(gcp_conn_id='test')
 
+    def _generate_messages(self, count):
+        return [
+            ParseDict(
+                {
+                    "ack_id": "%s" % i,
+                    "message": {
+                        "data": 'Message {}'.format(i).encode('utf8'),
 
 Review comment:
   ```suggestion
                           "data": f'Message {i}').encode('utf8'),
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r389243554
 
 

 ##########
 File path: tests/providers/google/cloud/hooks/test_pubsub.py
 ##########
 @@ -58,6 +60,21 @@ def setUp(self):
                         new=mock_init):
             self.pubsub_hook = PubSubHook(gcp_conn_id='test')
 
+    def _generate_messages(self, count):
 
 Review comment:
   ```suggestion
       def _generate_messages(self, count) -> List[ReceivedMessage]:
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r389243581
 
 

 ##########
 File path: tests/providers/google/cloud/hooks/test_pubsub.py
 ##########
 @@ -58,6 +60,21 @@ def setUp(self):
                         new=mock_init):
             self.pubsub_hook = PubSubHook(gcp_conn_id='test')
 
+    def _generate_messages(self, count):
+        return [
+            ParseDict(
+                {
+                    "ack_id": "%s" % i,
 
 Review comment:
   ```suggestion
                       "ack_id": str(i),
   ```
   Does it have to be a string?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#discussion_r389243511
 
 

 ##########
 File path: airflow/providers/google/cloud/sensors/pubsub.py
 ##########
 @@ -55,8 +61,15 @@ class PubSubPullSensor(BaseSensorOperator):
     :param max_messages: The maximum number of messages to retrieve per
         PubSub pull request
     :type max_messages: int
-    :param return_immediately: If True, instruct the PubSub API to return
-        immediately if no messages are available for delivery.
+    :param return_immediately:
+        (Deprecated) This is an underlying PubSub API implementation detail.
+        It has no real effect on Sensor behaviour other than some internal wait time before retrying
+        on empty queue.
+        The Sensor task will (by definition) always wait for a message, regardless of this argument value.
+
+        If you want a non-blocking task that does not to wait for messages, please use
+        :class:`airflow.providers.google.cloud.operators.PubSubPullOperator`
+        instead.
 
 Review comment:
   Good!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] michalslowikowski00 commented on issue #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on issue #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#issuecomment-595327420
 
 
   Aw... this is WIP. :) I haven't noticed 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ANiteckiP commented on issue #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator

Posted by GitBox <gi...@apache.org>.
ANiteckiP commented on issue #7613: WIP: [AIRFLOW-6978] Add PubSubPullOperator
URL: https://github.com/apache/airflow/pull/7613#issuecomment-595326635
 
 
   WIP: turned out I've missed some docs to be updated!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services