You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/09/26 13:53:04 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python

potiuk commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r328615185
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -52,254 +51,483 @@ class PubSubHook(GoogleCloudBaseHook):
 
     def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: Optional[str] = None) -> None:
         super().__init__(gcp_conn_id, delegate_to=delegate_to)
+        self._client = None
+
+    def get_conn(self) -> PublisherClient:
+        """
+        Retrieves connection to Google Cloud Pub/Sub.
 
-    def get_conn(self) -> Any:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.PublisherClient
         """
-        Returns a Pub/Sub service object.
+        if not self._client:
+            self._client = PublisherClient(
+                credentials=self._get_credentials(),
+                client_info=self.client_info
+            )
+        return self._client
 
-        :rtype: googleapiclient.discovery.Resource
+    @cached_property
+    def subscriber_client(self) -> SubscriberClient:
         """
-        http_authorized = self._authorize()
-        return build(
-            'pubsub', 'v1', http=http_authorized, cache_discovery=False)
+        Creates SubscriberClient.
 
-    def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.SubscriberClient
+        """
+        return SubscriberClient(
+            credentials=self._get_credentials(),
+            client_info=self.client_info
+        )
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    def publish(
+        self,
+        topic: str,
+        messages: List[Dict],
+        project_id: Optional[str] = None,
+    ) -> None:
         """
         Publishes messages to a Pub/Sub topic.
 
-        :param project: the GCP project ID in which to publish
-        :type project: str
         :param topic: the Pub/Sub topic to which to publish; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
         :param messages: messages to publish; if the data field in a
             message is set, it should already be base64 encoded.
         :type messages: list of PubSub messages; see
             http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+        :param project_id: Optional, the GCP project ID in which to publish.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
         """
-        body = {'messages': messages}
-        full_topic = _format_topic(project, topic)
-        request = self.get_conn().projects().topics().publish(  # pylint: disable=no-member
-            topic=full_topic, body=body)
+        assert project_id is not None
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project_id, topic)  # pylint: disable=no-member
+
+        # TODO validation of messages
 
 Review comment:
   Should we leave that TODO here? I think we might already check it here simply to check if we can 'base64' decode  What kind of validation we think we miss here? I think we already handle case where "data" is missing. I believe the publish method must check it already if the data is correctly encoded. If so, we should rather remove this TODO. Otherwise we should check ourselves what is the type of "data" field in the loop. I believe 
   ```
   isinstance(data, bytes)
   ```
   for python 3+ should work fine.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services