You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/07/05 23:20:50 UTC
incubator-airflow git commit: [AIRFLOW-300] Add Google Pubsub hook
and operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 644f5d43a -> d231dce37
[AIRFLOW-300] Add Google Pubsub hook and operator
Only publishing and topic creation are included.
Topic consumption was explicitly not included in
this feature request.
Closes #2036 from wwlian/airflow-300
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d231dce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d231dce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d231dce3
Branch: refs/heads/master
Commit: d231dce37d753ed196a26d9b244ddf376385de38
Parents: 644f5d4
Author: Wilson Lian <ww...@google.com>
Authored: Wed Jul 5 16:20:17 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jul 5 16:20:26 2017 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/__init__.py | 3 +-
airflow/contrib/hooks/gcp_pubsub_hook.py | 93 ++++++++++++
airflow/contrib/operators/pubsub_operator.py | 170 ++++++++++++++++++++++
tests/contrib/hooks/gcp_pubsub_hook.py | 102 +++++++++++++
tests/contrib/operators/pubsub_operator.py | 77 ++++++++++
5 files changed, 444 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 4941314..977c2ce 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -46,7 +46,8 @@ _hooks = {
'spark_submit_operator': ['SparkSubmitOperator'],
'cloudant_hook': ['CloudantHook'],
'fs_hook': ['FSHook'],
- 'wasb_hook': ['WasbHook']
+ 'wasb_hook': ['WasbHook'],
+ 'gcp_pubsub_hook': ['PubSubHook']
}
import os as _os
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_pubsub_hook.py b/airflow/contrib/hooks/gcp_pubsub_hook.py
new file mode 100644
index 0000000..529d121
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_pubsub_hook.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from apiclient.discovery import build
+from apiclient import errors
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+def _format_topic(project, topic):
+ return 'projects/%s/topics/%s' % (project, topic)
+
+
+class PubSubHook(GoogleCloudBaseHook):
+ """Hook for accessing Google Pub/Sub.
+
+ The GCP project against which actions are applied is determined by
+ the project embedded in the Connection referenced by gcp_conn_id.
+ """
+
+ def __init__(self,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None):
+ super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)
+
+ def get_conn(self):
+ """Returns a Pub/Sub service object.
+
+ :rtype: apiclient.discovery.Resource
+ """
+ http_authorized = self._authorize()
+ return build('pubsub', 'v1', http=http_authorized)
+
+ def publish(self, project, topic, messages):
+ """Publishes messages to a Pub/Sub topic.
+
+ :param project: the GCP project name or ID in which to publish
+ :type project: string
+ :param topic: the Pub/Sub topic to which to publish; do not
+ include the 'projects/{project}/topics/' prefix.
+ :type topic: string
+ :param messages: messages to publish; if the data field in a
+ message is set, it should already be base64 encoded.
+ :type messages: list of PubSub messages; see
+ http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+ """
+ body = {'messages': messages}
+ full_topic = _format_topic(project, topic)
+ request = self.get_conn().projects().topics().publish(
+ topic=full_topic, body=body)
+ try:
+ request.execute()
+ except errors.HttpError as e:
+ raise Exception('Error publishing to topic %s' % full_topic, e)
+
+ def create_topic(self, project, topic, fail_if_exists=False):
+ """Creates a Pub/Sub topic, if it does not already exist.
+
+ :param project: the GCP project name or ID in which to create
+ the topic
+ :type project: string
+ :param topic: the Pub/Sub topic name to create; do not
+ include the 'projects/{project}/topics/' prefix.
+ :type topic: string
+ :param fail_if_exists: if set, raise an exception if the topic
+ already exists
+ :type fail_if_exists: bool
+ """
+ service = self.get_conn()
+ full_topic = _format_topic(project, topic)
+ try:
+ service.projects().topics().create(
+ name=full_topic, body={}).execute()
+ except errors.HttpError as e:
+ # Status code 409 indicates that the topic already exists.
+ if str(e.resp['status']) == '409':
+ if fail_if_exists:
+ raise Exception(
+ 'Error creating topic. Topic already exists: %s'
+ % full_topic)
+ else:
+ raise Exception('Error creating topic %s' % full_topic, e)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
new file mode 100644
index 0000000..f68c6a9
--- /dev/null
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class PubSubTopicCreateOperator(BaseOperator):
+ """Create a PubSub topic.
+
+ By default, if the topic already exists, this operator will
+ not cause the DAG to fail.
+ ```
+ with DAG('successful DAG') as dag:
+ (
+ dag
+ >> PubSubTopicCreateOperator(topic='my_new_topic')
+ >> PubSubTopicCreateOperator(topic='my_new_topic')
+ )
+ ```
+
+ The operator can be configured to fail if the topic already exists.
+ ```
+ with DAG('failing DAG') as dag:
+ (
+ dag
+ >> PubSubTopicCreateOperator(topic='my_new_topic')
+ >> PubSubTopicCreateOperator(topic='my_new_topic',
+ fail_if_exists=True)
+ )
+ ```
+
+ Both ``project`` and ``topic`` are templated so you can use
+ variables in them.
+ """
+ template_fields = ['project', 'topic']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ project,
+ topic,
+ fail_if_exists=False,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ :param project: the GCP project name or ID in which to work
+ (templated)
+ :type project: string
+ :param topic: the topic to create. Do not include the
+ full topic path. In other words, instead of
+ ``projects/{project}/topics/{topic}``, provide only
+ ``{topic}``. (templated)
+ :type topic: string
+ :param gcp_conn_id: The connection ID to use connecting to
+ Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request
+ must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(PubSubTopicCreateOperator, self).__init__(*args, **kwargs)
+
+ self.project = project
+ self.topic = topic
+ self.fail_if_exists = fail_if_exists
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ hook.create_topic(self.project, self.topic,
+ fail_if_exists=self.fail_if_exists)
+
+
+class PubSubPublishOperator(BaseOperator):
+ """Publish messages to a PubSub topic.
+
+ Each Task publishes all provided messages to the same topic
+ in a single GCP project. If the topic does not exist, this
+ task will fail.
+
+ ```
+ from base64 import b64encode as b64e
+
+ m1 = {'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ }
+ m2 = {'data': b64e('Knock, knock')}
+ m3 = {'attributes': {'foo': ''}}
+
+ t1 = PubSubPublishOperator(
+ topic='my_topic',
+ messages=[m1, m2, m3],
+ create_topic=True,
+ dag=dag)
+ ```
+ Both ``project`` and ``topic`` are templated so you can use
+ variables in them.
+ """
+ template_fields = ['project', 'topic', 'messages']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ project,
+ topic,
+ messages,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ :param project: the GCP project name or ID in which to work
+ (templated)
+ :type project: string
+ :param topic: the topic to which to publish. Do not include the
+ full topic path. In other words, instead of
+ ``projects/{project}/topics/{topic}``, provide only
+ ``{topic}``. (templated)
+ :type topic: string
+ :param messages: a list of messages to be published to the
+ topic. Each message is a dict with one or more of the
+ following keys-value mappings:
+ * 'data': a base64-encoded string
+ * 'attributes': {'key1': 'value1', ...}
+ Each message must contain at least a non-empty 'data' value
+ or an attribute dict with at least one key. See
+ https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+ (templated)
+ :type messages: list
+ :param gcp_conn_id: The connection ID to use connecting to
+ Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request
+ must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(PubSubPublishOperator, self).__init__(*args, **kwargs)
+
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.project = project
+ self.topic = topic
+ self.messages = messages
+
+ def execute(self, context):
+ hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ hook.publish(self.project, self.topic, self.messages)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_pubsub_hook.py b/tests/contrib/hooks/gcp_pubsub_hook.py
new file mode 100644
index 0000000..9572c33
--- /dev/null
+++ b/tests/contrib/hooks/gcp_pubsub_hook.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from base64 import b64encode as b64e
+import unittest
+
+from apiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
+
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_MESSAGES = [
+ {
+ 'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ },
+ {'data': b64e('Knock, knock')},
+ {'attributes': {'foo': ''}}]
+
+EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+ pass
+
+
+class PubSubHookTest(unittest.TestCase):
+ def setUp(self):
+ with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+ new=mock_init):
+ self.pubsub_hook = PubSubHook(gcp_conn_id='test')
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_nonexistent_topic(self, mock_service):
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+ create_method = (mock_service.return_value.projects.return_value.topics
+ .return_value.create)
+ create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
+ create_method.return_value.execute.assert_called_with()
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_preexisting_topic_failifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics.return_value
+ .create.return_value.execute.side_effect) = HttpError(
+ resp={'status': '409'}, content='')
+
+ try:
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
+ fail_if_exists=True)
+ except Exception:
+ pass # Expected.
+ else:
+ self.fail('Topic creation should fail for existing topic when '
+ 'fail_if_exists=True')
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_preexisting_topic_nofailifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics.return_value
+ .get.return_value.execute.side_effect) = HttpError(
+ resp={'status': '409'}, content='')
+
+ try:
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
+ fail_if_exists=False)
+ except Exception:
+ self.fail('Topic creation should not fail for existing topic when '
+ 'fail_if_exists=False')
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_publish(self, mock_service):
+ self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
+
+ publish_method = (mock_service.return_value.projects.return_value
+ .topics.return_value.publish)
+ publish_method.assert_called_with(
+ topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d231dce3/tests/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/pubsub_operator.py b/tests/contrib/operators/pubsub_operator.py
new file mode 100644
index 0000000..a52bbc6
--- /dev/null
+++ b/tests/contrib/operators/pubsub_operator.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from base64 import b64encode as b64e
+import unittest
+
+from airflow.contrib.operators.pubsub_operator import PubSubPublishOperator
+from airflow.contrib.operators.pubsub_operator import PubSubTopicCreateOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+TASK_ID = 'test-task-id'
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_MESSAGES = [
+ {
+ 'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ },
+ {'data': b64e('Knock, knock')},
+ {'attributes': {'foo': ''}}]
+
+
+class PubSubTopicCreateOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_failifexists(self, mock_hook):
+ operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ fail_if_exists=True)
+
+ operator.execute(None)
+ mock_hook.return_value.create_topic.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_succeedifexists(self, mock_hook):
+ operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ fail_if_exists=False)
+
+ operator.execute(None)
+ mock_hook.return_value.create_topic.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
+
+
+class PubSubPublishOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_publish(self, mock_hook):
+ operator = PubSubPublishOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ messages=TEST_MESSAGES)
+
+ operator.execute(None)
+ mock_hook.return_value.publish.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)