You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/09/14 11:45:26 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python

mik-laj commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r324422074
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -77,47 +106,110 @@ def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
         :type messages: list of PubSub messages; see
             http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
         """
-        body = {'messages': messages}
-        full_topic = _format_topic(project, topic)
-        request = self.get_conn().projects().topics().publish(  # pylint: disable=no-member
-            topic=full_topic, body=body)
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project, topic)  # pylint: disable=no-member
+
+        # TODO validation of messages
+        self.log.info("Publish %d messages to topic (path) %s", len(messages), topic_path)
         try:
-            request.execute(num_retries=self.num_retries)
-        except HttpError as e:
+            for message in messages:
+                publisher.publish(
+                    topic=topic_path,
+                    data=message.get("data", b''),
+                    **message.get('attributes', {})
+                )
+        except GoogleAPICallError as e:
             raise PubSubException(
-                'Error publishing to topic {}'.format(full_topic), e)
+                'Error publishing to topic {}'.format(topic_path), e)
+
+        self.log.info("Published %d messages to topic (path) %s", len(messages), topic_path)
 
-    def create_topic(self, project: str, topic: str, fail_if_exists: bool = False) -> None:
+    def create_topic(
+        self,
+        project: str,
+        topic: str,
+        fail_if_exists: bool = False,
+        labels: Optional[Dict[str, str]] = None,
+        message_storage_policy: Union[Dict, MessageStoragePolicy] = None,
+        kms_key_name: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> None:
         """
         Creates a Pub/Sub topic, if it does not already exist.
 
-        :param project: the GCP project ID in which to create
-            the topic
+        :param project: the GCP project ID in which to create the topic
         :type project: str
         :param topic: the Pub/Sub topic name to create; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
         :param fail_if_exists: if set, raise an exception if the topic
             already exists
         :type fail_if_exists: bool
+        :param labels: Client-assigned labels; see
+            https://cloud.google.com/pubsub/docs/labels
+        :type labels: Dict[str, str]
+        :param message_storage_policy: Policy constraining the set
+            of Google Cloud Platform regions where messages published to
+            the topic may be stored. If not present, then no constraints
+            are in effect.
+        :type message_storage_policy:
+            Union[Dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
+        :param kms_key_name: The resource name of the Cloud KMS CryptoKey
+            to be used to protect access to messages published on this topic.
+            The expected format is
+            ``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
+        :type kms_key_name: str
+        :param retry: (Optional) A retry object used to retry requests.
+            If None is specified, requests will not be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: (Optional) The amount of time, in seconds, to wait for the request
+            to complete. Note that if retry is specified, the timeout applies to each
+            individual attempt.
+        :type timeout: float
+        :param metadata: (Optional) Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]]
         """
-        service = self.get_conn()
-        full_topic = _format_topic(project, topic)
+        assert project is not None
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project, topic)  # pylint: disable=no-member
+
+        # Add airflow-version label to the topic
+        labels = labels or {}
+        labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-')
+
+        self.log.info("Creating topic (path) %s", topic_path)
         try:
-            service.projects().topics().create(  # pylint: disable=no-member
-                name=full_topic, body={}).execute(num_retries=self.num_retries)
-        except HttpError as e:
-            # Status code 409 indicates that the topic already exists.
-            if str(e.resp['status']) == '409':
-                message = 'Topic already exists: {}'.format(full_topic)
-                self.log.warning(message)
-                if fail_if_exists:
-                    raise PubSubException(message)
-            else:
-                raise PubSubException(
-                    'Error creating topic {}'.format(full_topic), e)
-
-    def delete_topic(self, project: str, topic: str, fail_if_not_exists: bool = False) -> None:
+            # pylint: disable=no-member
+            publisher.create_topic(
+                name=topic_path,
+                labels=labels,
+                message_storage_policy=message_storage_policy,
+                kms_key_name=kms_key_name,
+                retry=retry,
+                timeout=timeout,
+                metadata=metadata,
+            )
+        except AlreadyExists:
+            message = 'Topic already exists: {}'.format(topic)
+            self.log.warning(message)
 
 Review comment:
   We should avoid formatting string before passing to logger. 
   Reference
   https://github.com/apache/airflow/pull/4804/files
   https://github.com/apache/airflow/pulls?utf8=%E2%9C%93&q=is%3Apr+author%3Anuclearpinguin+colour

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services