You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/07/05 23:20:50 UTC

incubator-airflow git commit: [AIRFLOW-300] Add Google Pubsub hook and operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 644f5d43a -> d231dce37


[AIRFLOW-300] Add Google Pubsub hook and operator

Only publishing and topic creation are included.
Topic consumption was explicitly not included in
this feature request.

Closes #2036 from wwlian/airflow-300


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d231dce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d231dce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d231dce3

Branch: refs/heads/master
Commit: d231dce37d753ed196a26d9b244ddf376385de38
Parents: 644f5d4
Author: Wilson Lian <ww...@google.com>
Authored: Wed Jul 5 16:20:17 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jul 5 16:20:26 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py            |   3 +-
 airflow/contrib/hooks/gcp_pubsub_hook.py     |  93 ++++++++++++
 airflow/contrib/operators/pubsub_operator.py | 170 ++++++++++++++++++++++
 tests/contrib/hooks/gcp_pubsub_hook.py       | 102 +++++++++++++
 tests/contrib/operators/pubsub_operator.py   |  77 ++++++++++
 5 files changed, 444 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 4941314..977c2ce 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -46,7 +46,8 @@ _hooks = {
     'spark_submit_operator': ['SparkSubmitOperator'],
     'cloudant_hook': ['CloudantHook'],
     'fs_hook': ['FSHook'],
-    'wasb_hook': ['WasbHook']
+    'wasb_hook': ['WasbHook'],
+    'gcp_pubsub_hook': ['PubSubHook']
 }
 
 import os as _os

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_pubsub_hook.py b/airflow/contrib/hooks/gcp_pubsub_hook.py
new file mode 100644
index 0000000..529d121
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_pubsub_hook.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from apiclient.discovery import build
+from apiclient import errors
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+def _format_topic(project, topic):
+    return 'projects/%s/topics/%s' % (project, topic)
+
+
+class PubSubHook(GoogleCloudBaseHook):
+    """Hook for accessing Google Pub/Sub.
+
+    The GCP project against which actions are applied is determined by
+    the project embedded in the Connection referenced by gcp_conn_id.
+    """
+
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)
+
+    def get_conn(self):
+        """Returns a Pub/Sub service object.
+
+        :rtype: apiclient.discovery.Resource
+        """
+        http_authorized = self._authorize()
+        return build('pubsub', 'v1', http=http_authorized)
+
+    def publish(self, project, topic, messages):
+        """Publishes messages to a Pub/Sub topic.
+
+        :param project: the GCP project name or ID in which to publish
+        :type project: string
+        :param topic: the Pub/Sub topic to which to publish; do not
+            include the 'projects/{project}/topics/' prefix.
+        :type topic: string
+        :param messages: messages to publish; if the data field in a
+            message is set, it should already be base64 encoded.
+        :type messages: list of PubSub messages; see
+            http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+        """
+        body = {'messages': messages}
+        full_topic = _format_topic(project, topic)
+        request = self.get_conn().projects().topics().publish(
+            topic=full_topic, body=body)
+        try:
+            request.execute()
+        except errors.HttpError as e:
+            raise Exception('Error publishing to topic %s' % full_topic, e)
+
+    def create_topic(self, project, topic, fail_if_exists=False):
+        """Creates a Pub/Sub topic, if it does not already exist.
+
+        :param project: the GCP project name or ID in which to create
+            the topic
+        :type project: string
+        :param topic: the Pub/Sub topic name to create; do not
+            include the 'projects/{project}/topics/' prefix.
+        :type topic: string
+        :param fail_if_exists: if set, raise an exception if the topic
+            already exists
+        :type fail_if_exists: bool
+        """
+        service = self.get_conn()
+        full_topic = _format_topic(project, topic)
+        try:
+            service.projects().topics().create(
+                name=full_topic, body={}).execute()
+        except errors.HttpError as e:
+            # Status code 409 indicates that the topic already exists.
+            if str(e.resp['status']) == '409':
+                if fail_if_exists:
+                    raise Exception(
+                        'Error creating topic. Topic already exists: %s'
+                        % full_topic)
+            else:
+                raise Exception('Error creating topic %s' % full_topic, e)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
new file mode 100644
index 0000000..f68c6a9
--- /dev/null
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class PubSubTopicCreateOperator(BaseOperator):
+    """Create a PubSub topic.
+
+    By default, if the topic already exists, this operator will
+    not cause the DAG to fail.
+    ```
+    with DAG('successful DAG') as dag:
+        (
+            dag
+            >> PubSubTopicCreateOperator(topic='my_new_topic')
+            >> PubSubTopicCreateOperator(topic='my_new_topic')
+        )
+    ```
+
+    The operator can be configured to fail if the topic already exists.
+    ```
+    with DAG('failing DAG') as dag:
+        (
+            dag
+            >> PubSubTopicCreateOperator(topic='my_new_topic')
+            >> PubSubTopicCreateOperator(topic='my_new_topic',
+                                         fail_if_exists=True)
+        )
+    ```
+
+    Both ``project`` and ``topic`` are templated so you can use
+    variables in them.
+    """
+    template_fields = ['project', 'topic']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project,
+            topic,
+            fail_if_exists=False,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        :param project: the GCP project name or ID in which to work
+            (templated)
+        :type project: string
+        :param topic: the topic to create. Do not include the
+            full topic path. In other words, instead of
+            ``projects/{project}/topics/{topic}``, provide only
+            ``{topic}``. (templated)
+        :type topic: string
+        :param gcp_conn_id: The connection ID to use connecting to
+            Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request
+            must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(PubSubTopicCreateOperator, self).__init__(*args, **kwargs)
+
+        self.project = project
+        self.topic = topic
+        self.fail_if_exists = fail_if_exists
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+                          delegate_to=self.delegate_to)
+
+        hook.create_topic(self.project, self.topic,
+                          fail_if_exists=self.fail_if_exists)
+
+
+class PubSubPublishOperator(BaseOperator):
+    """Publish messages to a PubSub topic.
+
+    Each Task publishes all provided messages to the same topic
+    in a single GCP project. If the topic does not exist, this
+    task will fail.
+
+    ```
+    from base64 import b64encode as b64e
+
+    m1 = {'data': b64e('Hello, World!'),
+          'attributes': {'type': 'greeting'}
+         }
+    m2 = {'data': b64e('Knock, knock')}
+    m3 = {'attributes': {'foo': ''}}
+
+    t1 = PubSubPublishOperator(
+        topic='my_topic',
+        messages=[m1, m2, m3],
+        create_topic=True,
+        dag=dag)
+    ```
+    Both ``project`` and ``topic`` are templated so you can use
+    variables in them.
+    """
+    template_fields = ['project', 'topic', 'messages']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project,
+            topic,
+            messages,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        :param project: the GCP project name or ID in which to work
+            (templated)
+        :type project: string
+        :param topic: the topic to which to publish. Do not include the
+            full topic path. In other words, instead of
+            ``projects/{project}/topics/{topic}``, provide only
+            ``{topic}``. (templated)
+        :type topic: string
+        :param messages: a list of messages to be published to the
+            topic. Each message is a dict with one or more of the
+            following keys-value mappings:
+            * 'data': a base64-encoded string
+            * 'attributes': {'key1': 'value1', ...}
+            Each message must contain at least a non-empty 'data' value
+            or an attribute dict with at least one key. See
+            https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+            (templated)
+        :type messages: list
+        :param gcp_conn_id: The connection ID to use connecting to
+            Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request
+            must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(PubSubPublishOperator, self).__init__(*args, **kwargs)
+
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.project = project
+        self.topic = topic
+        self.messages = messages
+
+    def execute(self, context):
+        hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+                          delegate_to=self.delegate_to)
+        hook.publish(self.project, self.topic, self.messages)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_pubsub_hook.py b/tests/contrib/hooks/gcp_pubsub_hook.py
new file mode 100644
index 0000000..9572c33
--- /dev/null
+++ b/tests/contrib/hooks/gcp_pubsub_hook.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from base64 import b64encode as b64e
+import unittest
+
+from apiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
+
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_MESSAGES = [
+    {
+        'data': b64e('Hello, World!'),
+        'attributes': {'type': 'greeting'}
+    },
+    {'data': b64e('Knock, knock')},
+    {'attributes': {'foo': ''}}]
+
+EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+    pass
+
+
+class PubSubHookTest(unittest.TestCase):
+    def setUp(self):
+        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+                        new=mock_init):
+            self.pubsub_hook = PubSubHook(gcp_conn_id='test')
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_nonexistent_topic(self, mock_service):
+        self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+        create_method = (mock_service.return_value.projects.return_value.topics
+                         .return_value.create)
+        create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
+        create_method.return_value.execute.assert_called_with()
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_preexisting_topic_failifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics.return_value
+         .create.return_value.execute.side_effect) = HttpError(
+            resp={'status': '409'}, content='')
+
+        try:
+            self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
+                                          fail_if_exists=True)
+        except Exception:
+            pass  # Expected.
+        else:
+            self.fail('Topic creation should fail for existing topic when '
+                      'fail_if_exists=True')
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_preexisting_topic_nofailifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics.return_value
+         .get.return_value.execute.side_effect) = HttpError(
+            resp={'status': '409'}, content='')
+
+        try:
+            self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
+                                          fail_if_exists=False)
+        except Exception:
+            self.fail('Topic creation should not fail for existing topic when '
+                      'fail_if_exists=False')
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_publish(self, mock_service):
+        self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
+
+        publish_method = (mock_service.return_value.projects.return_value
+                          .topics.return_value.publish)
+        publish_method.assert_called_with(
+            topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/pubsub_operator.py b/tests/contrib/operators/pubsub_operator.py
new file mode 100644
index 0000000..a52bbc6
--- /dev/null
+++ b/tests/contrib/operators/pubsub_operator.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from base64 import b64encode as b64e
+import unittest
+
+from airflow.contrib.operators.pubsub_operator import PubSubPublishOperator
+from airflow.contrib.operators.pubsub_operator import PubSubTopicCreateOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-task-id'
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_MESSAGES = [
+    {
+        'data': b64e('Hello, World!'),
+        'attributes': {'type': 'greeting'}
+    },
+    {'data': b64e('Knock, knock')},
+    {'attributes': {'foo': ''}}]
+
+
+class PubSubTopicCreateOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_failifexists(self, mock_hook):
+        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+                                             project=TEST_PROJECT,
+                                             topic=TEST_TOPIC,
+                                             fail_if_exists=True)
+
+        operator.execute(None)
+        mock_hook.return_value.create_topic.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_succeedifexists(self, mock_hook):
+        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+                                             project=TEST_PROJECT,
+                                             topic=TEST_TOPIC,
+                                             fail_if_exists=False)
+
+        operator.execute(None)
+        mock_hook.return_value.create_topic.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
+
+
+class PubSubPublishOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_publish(self, mock_hook):
+        operator = PubSubPublishOperator(task_id=TASK_ID,
+                                         project=TEST_PROJECT,
+                                         topic=TEST_TOPIC,
+                                         messages=TEST_MESSAGES)
+
+        operator.execute(None)
+        mock_hook.return_value.publish.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)