You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/18 22:54:09 UTC
incubator-airflow git commit: [AIRFLOW-1913] Add new GCP PubSub
operators
Repository: incubator-airflow
Updated Branches:
refs/heads/master 16b5f9a19 -> 8942d2e84
[AIRFLOW-1913] Add new GCP PubSub operators
Closes #2872 from prodonjs/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8942d2e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8942d2e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8942d2e8
Branch: refs/heads/master
Commit: 8942d2e848b0df1b4c7868451f6815af5ae0fa8b
Parents: 16b5f9a
Author: Jason Prodonovich <pr...@google.com>
Authored: Mon Dec 18 14:53:30 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Dec 18 14:53:40 2017 -0800
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_pubsub_hook.py | 143 +++++++-
airflow/contrib/operators/pubsub_operator.py | 344 ++++++++++++++++---
docs/code.rst | 7 +-
tests/contrib/hooks/gcp_pubsub_hook.py | 102 ------
tests/contrib/hooks/test_gcp_pubsub_hook.py | 242 +++++++++++++
tests/contrib/operators/pubsub_operator.py | 77 -----
tests/contrib/operators/test_pubsub_operator.py | 140 ++++++++
7 files changed, 821 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/airflow/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_pubsub_hook.py b/airflow/contrib/hooks/gcp_pubsub_hook.py
index 529d121..dc95d89 100644
--- a/airflow/contrib/hooks/gcp_pubsub_hook.py
+++ b/airflow/contrib/hooks/gcp_pubsub_hook.py
@@ -12,16 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from uuid import uuid4
+
from apiclient.discovery import build
from apiclient import errors
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+def _format_subscription(project, subscription):
+ return 'projects/%s/subscriptions/%s' % (project, subscription)
+
+
def _format_topic(project, topic):
return 'projects/%s/topics/%s' % (project, topic)
+class PubSubException(Exception):
+ pass
+
+
class PubSubHook(GoogleCloudBaseHook):
"""Hook for accessing Google Pub/Sub.
@@ -29,9 +39,7 @@ class PubSubHook(GoogleCloudBaseHook):
the project embedded in the Connection referenced by gcp_conn_id.
"""
- def __init__(self,
- gcp_conn_id='google_cloud_default',
- delegate_to=None):
+ def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)
def get_conn(self):
@@ -45,10 +53,10 @@ class PubSubHook(GoogleCloudBaseHook):
def publish(self, project, topic, messages):
"""Publishes messages to a Pub/Sub topic.
- :param project: the GCP project name or ID in which to publish
+ :param project: the GCP project ID in which to publish
:type project: string
:param topic: the Pub/Sub topic to which to publish; do not
- include the 'projects/{project}/topics/' prefix.
+ include the ``projects/{project}/topics/`` prefix.
:type topic: string
:param messages: messages to publish; if the data field in a
message is set, it should already be base64 encoded.
@@ -62,16 +70,17 @@ class PubSubHook(GoogleCloudBaseHook):
try:
request.execute()
except errors.HttpError as e:
- raise Exception('Error publishing to topic %s' % full_topic, e)
+ raise PubSubException(
+ 'Error publishing to topic %s' % full_topic, e)
def create_topic(self, project, topic, fail_if_exists=False):
"""Creates a Pub/Sub topic, if it does not already exist.
- :param project: the GCP project name or ID in which to create
+ :param project: the GCP project ID in which to create
the topic
:type project: string
:param topic: the Pub/Sub topic name to create; do not
- include the 'projects/{project}/topics/' prefix.
+ include the ``projects/{project}/topics/`` prefix.
:type topic: string
:param fail_if_exists: if set, raise an exception if the topic
already exists
@@ -85,9 +94,119 @@ class PubSubHook(GoogleCloudBaseHook):
except errors.HttpError as e:
# Status code 409 indicates that the topic already exists.
if str(e.resp['status']) == '409':
+ message = 'Topic already exists: %s' % full_topic
+ self.log.warning(message)
+ if fail_if_exists:
+ raise PubSubException(message)
+ else:
+ raise PubSubException('Error creating topic %s' % full_topic, e)
+
+ def delete_topic(self, project, topic, fail_if_not_exists=False):
+ """Deletes a Pub/Sub topic if it exists.
+
+ :param project: the GCP project ID in which to delete the topic
+ :type project: string
+ :param topic: the Pub/Sub topic name to delete; do not
+ include the ``projects/{project}/topics/`` prefix.
+ :type topic: string
+ :param fail_if_not_exists: if set, raise an exception if the topic
+ does not exist
+ :type fail_if_not_exists: bool
+ """
+ service = self.get_conn()
+ full_topic = _format_topic(project, topic)
+ try:
+ service.projects().topics().delete(topic=full_topic).execute()
+ except errors.HttpError as e:
+ # Status code 409 indicates that the topic was not found
+ if str(e.resp['status']) == '404':
+ message = 'Topic does not exist: %s' % full_topic
+ self.log.warning(message)
+ if fail_if_not_exists:
+ raise PubSubException(message)
+ else:
+ raise PubSubException('Error deleting topic %s' % full_topic, e)
+
+ def create_subscription(self, topic_project, topic, subscription=None,
+ subscription_project=None, ack_deadline_secs=10,
+ fail_if_exists=False):
+ """Creates a Pub/Sub subscription, if it does not already exist.
+
+ :param topic_project: the GCP project ID of the topic that the
+ subscription will be bound to.
+ :type topic_project: string
+ :param topic: the Pub/Sub topic name that the subscription will be bound
+ to create; do not include the ``projects/{project}/subscriptions/``
+ prefix.
+ :type topic: string
+ :param subscription: the Pub/Sub subscription name. If empty, a random
+ name will be generated using the uuid module
+ :type subscription: string
+ :param subscription_project: the GCP project ID where the subscription
+ will be created. If unspecified, ``topic_project`` will be used.
+ :type subscription_project: string
+ :param ack_deadline_secs: Number of seconds that a subscriber has to
+ acknowledge each message pulled from the subscription
+ :type ack_deadline_secs: int
+ :param fail_if_exists: if set, raise an exception if the topic
+ already exists
+ :type fail_if_exists: bool
+ :return: subscription name which will be the system-generated value if
+ the ``subscription`` parameter is not supplied
+ :rtype: string
+ """
+ service = self.get_conn()
+ full_topic = _format_topic(topic_project, topic)
+ if not subscription:
+ subscription = 'sub-%s' % uuid4()
+ if not subscription_project:
+ subscription_project = topic_project
+ full_subscription = _format_subscription(subscription_project,
+ subscription)
+ body = {
+ 'topic': full_topic,
+ 'ackDeadlineSeconds': ack_deadline_secs
+ }
+ try:
+ service.projects().subscriptions().create(
+ name=full_subscription, body=body).execute()
+ except errors.HttpError as e:
+ # Status code 409 indicates that the subscription already exists.
+ if str(e.resp['status']) == '409':
+ message = 'Subscription already exists: %s' % full_subscription
+ self.log.warning(message)
if fail_if_exists:
- raise Exception(
- 'Error creating topic. Topic already exists: %s'
- % full_topic)
+ raise PubSubException(message)
+ else:
+ raise PubSubException(
+ 'Error creating subscription %s' % full_subscription, e)
+ return subscription
+
+ def delete_subscription(self, project, subscription,
+ fail_if_not_exists=False):
+ """Deletes a Pub/Sub subscription, if it exists.
+
+ :param project: the GCP project ID where the subscription exists
+ :type project: string
+ :param subscription: the Pub/Sub subscription name to delete; do not
+ include the ``projects/{project}/subscriptions/`` prefix.
+ :type subscription: string
+ :param fail_if_not_exists: if set, raise an exception if the topic
+ does not exist
+ :type fail_if_not_exists: bool
+ """
+ service = self.get_conn()
+ full_subscription = _format_subscription(project, subscription)
+ try:
+ service.projects().subscriptions().delete(
+ subscription=full_subscription).execute()
+ except errors.HttpError as e:
+ # Status code 404 indicates that the subscription was not found
+ if str(e.resp['status']) == '404':
+ message = 'Subscription does not exist: %s' % full_subscription
+ self.log.warning(message)
+ if fail_if_not_exists:
+ raise PubSubException(message)
else:
- raise Exception('Error creating topic %s' % full_topic, e)
+ raise PubSubException('Error deleting subscription %s' %
+ full_subscription, e)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
index f68c6a9..7793787 100644
--- a/airflow/contrib/operators/pubsub_operator.py
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -21,26 +21,28 @@ class PubSubTopicCreateOperator(BaseOperator):
"""Create a PubSub topic.
By default, if the topic already exists, this operator will
- not cause the DAG to fail.
- ```
- with DAG('successful DAG') as dag:
- (
- dag
- >> PubSubTopicCreateOperator(topic='my_new_topic')
- >> PubSubTopicCreateOperator(topic='my_new_topic')
- )
- ```
-
- The operator can be configured to fail if the topic already exists.
- ```
- with DAG('failing DAG') as dag:
- (
- dag
- >> PubSubTopicCreateOperator(topic='my_new_topic')
- >> PubSubTopicCreateOperator(topic='my_new_topic',
- fail_if_exists=True)
- )
- ```
+ not cause the DAG to fail. ::
+
+ with DAG('successful DAG') as dag:
+ (
+ dag
+ >> PubSubTopicCreateOperator(project='my-project',
+ topic='my_new_topic')
+ >> PubSubTopicCreateOperator(project='my-project',
+ topic='my_new_topic')
+ )
+
+ The operator can be configured to fail if the topic already exists. ::
+
+ with DAG('failing DAG') as dag:
+ (
+ dag
+ >> PubSubTopicCreateOperator(project='my-project',
+ topic='my_new_topic')
+ >> PubSubTopicCreateOperator(project='my-project',
+ topic='my_new_topic',
+ fail_if_exists=True)
+ )
Both ``project`` and ``topic`` are templated so you can use
variables in them.
@@ -59,8 +61,7 @@ class PubSubTopicCreateOperator(BaseOperator):
*args,
**kwargs):
"""
- :param project: the GCP project name or ID in which to work
- (templated)
+ :param project: the GCP project ID where the topic will be created
:type project: string
:param topic: the topic to create. Do not include the
full topic path. In other words, instead of
@@ -91,29 +92,289 @@ class PubSubTopicCreateOperator(BaseOperator):
fail_if_exists=self.fail_if_exists)
+class PubSubSubscriptionCreateOperator(BaseOperator):
+ """Create a PubSub subscription.
+
+ By default, the subscription will be created in ``topic_project``. If
+ ``subscription_project`` is specified and the GCP credentials allow, the
+ Subscription can be created in a different project from its topic.
+
+ By default, if the subscription already exists, this operator will
+ not cause the DAG to fail. However, the topic must exist in the project. ::
+
+ with DAG('successful DAG') as dag:
+ (
+ dag
+ >> PubSubSubscriptionCreateOperator(
+ topic_project='my-project', topic='my-topic',
+ subscription='my-subscription')
+ >> PubSubSubscriptionCreateOperator(
+ topic_project='my-project', topic='my-topic',
+ subscription='my-subscription')
+ )
+
+ The operator can be configured to fail if the subscription already exists.
+ ::
+
+ with DAG('failing DAG') as dag:
+ (
+ dag
+ >> PubSubSubscriptionCreateOperator(
+ topic_project='my-project', topic='my-topic',
+ subscription='my-subscription')
+ >> PubSubSubscriptionCreateOperator(
+ topic_project='my-project', topic='my-topic',
+ subscription='my-subscription', fail_if_exists=True)
+ )
+
+ Finally, subscription is not required. If not passed, the operator will
+ generated a universally unique identifier for the subscription's name. ::
+
+ with DAG('DAG') as dag:
+ (
+ dag >> PubSubSubscriptionCreateOperator(
+ topic_project='my-project', topic='my-topic')
+ )
+
+ ``topic_project``, ``topic``, ``subscription``, and
+ ``subscription`` are templated so you can use variables in them.
+ """
+ template_fields = ['topic_project', 'topic', 'subscription',
+ 'subscription_project']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ topic_project,
+ topic,
+ subscription=None,
+ subscription_project=None,
+ ack_deadline_secs=10,
+ fail_if_exists=False,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ :param topic_project: the GCP project ID where the topic exists
+ :type topic_project: string
+ :param topic: the topic to create. Do not include the
+ full topic path. In other words, instead of
+ ``projects/{project}/topics/{topic}``, provide only
+ ``{topic}``. (templated)
+ :type topic: string
+ :param subscription: the Pub/Sub subscription name. If empty, a random
+ name will be generated using the uuid module
+ :type subscription: string
+ :param subscription_project: the GCP project ID where the subscription
+ will be created. If empty, ``topic_project`` will be used.
+ :type subscription_project: string
+ :param ack_deadline_secs: Number of seconds that a subscriber has to
+ acknowledge each message pulled from the subscription
+ :type ack_deadline_secs: int
+ :param gcp_conn_id: The connection ID to use connecting to
+ Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request
+ must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(PubSubSubscriptionCreateOperator, self).__init__(*args, **kwargs)
+
+ self.topic_project = topic_project
+ self.topic = topic
+ self.subscription = subscription
+ self.subscription_project = subscription_project
+ self.ack_deadline_secs = ack_deadline_secs
+ self.fail_if_exists = fail_if_exists
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ hook.create_subscription(
+ self.topic_project, self.topic, self.subscription,
+ self.subscription_project, self.ack_deadline_secs,
+ self.fail_if_exists)
+
+
+class PubSubTopicDeleteOperator(BaseOperator):
+ """Delete a PubSub topic.
+
+ By default, if the topic does not exist, this operator will
+ not cause the DAG to fail. ::
+
+ with DAG('successful DAG') as dag:
+ (
+ dag
+ >> PubSubTopicDeleteOperator(project='my-project',
+ topic='non_existing_topic')
+ )
+
+ The operator can be configured to fail if the topic does not exist. ::
+
+ with DAG('failing DAG') as dag:
+ (
+ dag
+ >> PubSubTopicCreateOperator(project='my-project',
+ topic='non_existing_topic',
+ fail_if_not_exists=True)
+ )
+
+ Both ``project`` and ``topic`` are templated so you can use
+ variables in them.
+ """
+ template_fields = ['project', 'topic']
+ ui_color = '#cb4335'
+
+ @apply_defaults
+ def __init__(
+ self,
+ project,
+ topic,
+ fail_if_not_exists=False,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ :param project: the GCP project ID in which to work (templated)
+ :type project: string
+ :param topic: the topic to delete. Do not include the
+ full topic path. In other words, instead of
+ ``projects/{project}/topics/{topic}``, provide only
+ ``{topic}``. (templated)
+ :type topic: string
+ :param fail_if_not_exists: If True and the topic does not exist, fail
+ the task
+ :type fail_if_not_exists: bool
+ :param gcp_conn_id: The connection ID to use connecting to
+ Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request
+ must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(PubSubTopicDeleteOperator, self).__init__(*args, **kwargs)
+
+ self.project = project
+ self.topic = topic
+ self.fail_if_not_exists = fail_if_not_exists
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ hook.delete_topic(self.project, self.topic,
+ fail_if_not_exists=self.fail_if_not_exists)
+
+
+class PubSubSubscriptionDeleteOperator(BaseOperator):
+ """Delete a PubSub subscription.
+
+ By default, if the subscription does not exist, this operator will
+ not cause the DAG to fail. ::
+
+ with DAG('successful DAG') as dag:
+ (
+ dag
+ >> PubSubSubscriptionDeleteOperator(project='my-project',
+ subscription='non-existing')
+ )
+
+ The operator can be configured to fail if the subscription already exists.
+
+ ::
+
+ with DAG('failing DAG') as dag:
+ (
+ dag
+ >> PubSubSubscriptionDeleteOperator(
+ project='my-project', subscription='non-existing',
+ fail_if_not_exists=True)
+ )
+
+ ``project``, and ``subscription`` are templated so you can use
+ variables in them.
+ """
+ template_fields = ['project', 'subscription']
+ ui_color = '#cb4335'
+
+ @apply_defaults
+ def __init__(
+ self,
+ project,
+ subscription,
+ fail_if_not_exists=False,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ :param project: the GCP project ID in which to work (templated)
+ :type project: string
+ :param subscription: the subscription to delete. Do not include the
+ full subscription path. In other words, instead of
+ ``projects/{project}/subscription/{subscription}``, provide only
+ ``{subscription}``. (templated)
+ :type subscription: string
+ :param fail_if_not_exists: If True and the subscription does not exist,
+ fail the task
+ :type fail_if_not_exists: bool
+ :param gcp_conn_id: The connection ID to use connecting to
+ Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request
+ must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(PubSubSubscriptionDeleteOperator, self).__init__(*args, **kwargs)
+
+ self.project = project
+ self.subscription = subscription
+ self.fail_if_not_exists = fail_if_not_exists
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+
+ hook.delete_subscription(self.project, self.subscription,
+ fail_if_not_exists=self.fail_if_not_exists)
+
+
class PubSubPublishOperator(BaseOperator):
"""Publish messages to a PubSub topic.
Each Task publishes all provided messages to the same topic
in a single GCP project. If the topic does not exist, this
- task will fail.
-
- ```
- from base64 import b64encode as b64e
-
- m1 = {'data': b64e('Hello, World!'),
- 'attributes': {'type': 'greeting'}
- }
- m2 = {'data': b64e('Knock, knock')}
- m3 = {'attributes': {'foo': ''}}
-
- t1 = PubSubPublishOperator(
- topic='my_topic',
- messages=[m1, m2, m3],
- create_topic=True,
- dag=dag)
- ```
- Both ``project`` and ``topic`` are templated so you can use
+ task will fail. ::
+
+ from base64 import b64encode as b64e
+
+ m1 = {'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ }
+ m2 = {'data': b64e('Knock, knock')}
+ m3 = {'attributes': {'foo': ''}}
+
+ t1 = PubSubPublishOperator(
+ project='my-project',
+ topic='my_topic',
+ messages=[m1, m2, m3],
+ create_topic=True,
+ dag=dag)
+
+ ``project``, ``topic``, and ``messages`` are templated so you can use
variables in them.
"""
template_fields = ['project', 'topic', 'messages']
@@ -130,8 +391,7 @@ class PubSubPublishOperator(BaseOperator):
*args,
**kwargs):
"""
- :param project: the GCP project name or ID in which to work
- (templated)
+ :param project: the GCP project ID in which to work (templated)
:type project: string
:param topic: the topic to which to publish. Do not include the
full topic path. In other words, instead of
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 1369b32..021a05e 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -102,6 +102,10 @@ Community-contributed Operators
.. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
.. autoclass:: airflow.contrib.operators.QuboleOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
@@ -241,7 +245,8 @@ Community contributed hooks
VerticaHook,
FTPHook,
SSHHook,
- CloudantHook
+ CloudantHook,
+ PubSubHook
.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_pubsub_hook.py b/tests/contrib/hooks/gcp_pubsub_hook.py
deleted file mode 100644
index 9572c33..0000000
--- a/tests/contrib/hooks/gcp_pubsub_hook.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from base64 import b64encode as b64e
-import unittest
-
-from apiclient.errors import HttpError
-
-from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
-
-try:
- from unittest import mock
-except ImportError:
- try:
- import mock
- except ImportError:
- mock = None
-
-
-BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
-PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
-
-TEST_PROJECT = 'test-project'
-TEST_TOPIC = 'test-topic'
-TEST_MESSAGES = [
- {
- 'data': b64e('Hello, World!'),
- 'attributes': {'type': 'greeting'}
- },
- {'data': b64e('Knock, knock')},
- {'attributes': {'foo': ''}}]
-
-EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
-
-
-def mock_init(self, gcp_conn_id, delegate_to=None):
- pass
-
-
-class PubSubHookTest(unittest.TestCase):
- def setUp(self):
- with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
- new=mock_init):
- self.pubsub_hook = PubSubHook(gcp_conn_id='test')
-
- @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
- def test_create_nonexistent_topic(self, mock_service):
- self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
-
- create_method = (mock_service.return_value.projects.return_value.topics
- .return_value.create)
- create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
- create_method.return_value.execute.assert_called_with()
-
- @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
- def test_create_preexisting_topic_failifexists(self, mock_service):
- (mock_service.return_value.projects.return_value.topics.return_value
- .create.return_value.execute.side_effect) = HttpError(
- resp={'status': '409'}, content='')
-
- try:
- self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
- fail_if_exists=True)
- except Exception:
- pass # Expected.
- else:
- self.fail('Topic creation should fail for existing topic when '
- 'fail_if_exists=True')
-
- @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
- def test_create_preexisting_topic_nofailifexists(self, mock_service):
- (mock_service.return_value.projects.return_value.topics.return_value
- .get.return_value.execute.side_effect) = HttpError(
- resp={'status': '409'}, content='')
-
- try:
- self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
- fail_if_exists=False)
- except Exception:
- self.fail('Topic creation should not fail for existing topic when '
- 'fail_if_exists=False')
-
- @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
- def test_publish(self, mock_service):
- self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
-
- publish_method = (mock_service.return_value.projects.return_value
- .topics.return_value.publish)
- publish_method.assert_called_with(
- topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/hooks/test_gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_gcp_pubsub_hook.py b/tests/contrib/hooks/test_gcp_pubsub_hook.py
new file mode 100644
index 0000000..8cc6c8f
--- /dev/null
+++ b/tests/contrib/hooks/test_gcp_pubsub_hook.py
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from base64 import b64encode as b64e
+import unittest
+
+from apiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubException, PubSubHook
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
+
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_SUBSCRIPTION = 'test-subscription'
+TEST_UUID = 'abc123-xzy789'
+TEST_MESSAGES = [
+ {
+ 'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ },
+ {'data': b64e('Knock, knock')},
+ {'attributes': {'foo': ''}}]
+
+EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
+EXPANDED_SUBSCRIPTION = 'projects/%s/subscriptions/%s' % (TEST_PROJECT,
+ TEST_SUBSCRIPTION)
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+ pass
+
+
+class PubSubHookTest(unittest.TestCase):
+ def setUp(self):
+ with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+ new=mock_init):
+ self.pubsub_hook = PubSubHook(gcp_conn_id='test')
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_nonexistent_topic(self, mock_service):
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+ create_method = (mock_service.return_value.projects.return_value.topics
+ .return_value.create)
+ create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
+ create_method.return_value.execute.assert_called_with()
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_delete_topic(self, mock_service):
+ self.pubsub_hook.delete_topic(TEST_PROJECT, TEST_TOPIC)
+
+ delete_method = (mock_service.return_value.projects.return_value.topics
+ .return_value.delete)
+ delete_method.assert_called_with(topic=EXPANDED_TOPIC)
+ delete_method.return_value.execute.assert_called_with()
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_delete_nonexisting_topic_failifnotexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics
+ .return_value.delete.return_value.execute.side_effect) = HttpError(
+ resp={'status': '404'}, content='')
+
+ with self.assertRaises(PubSubException) as e:
+ self.pubsub_hook.delete_topic(TEST_PROJECT, TEST_TOPIC, True)
+
+ self.assertEquals(e.exception.message,
+ 'Topic does not exist: %s' % EXPANDED_TOPIC)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_preexisting_topic_failifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics.return_value
+ .create.return_value.execute.side_effect) = HttpError(
+ resp={'status': '409'}, content='')
+
+ with self.assertRaises(PubSubException) as e:
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC, True)
+ self.assertEquals(e.exception.message,
+ 'Topic already exists: %s' % EXPANDED_TOPIC)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_preexisting_topic_nofailifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics.return_value
+ .get.return_value.execute.side_effect) = HttpError(
+ resp={'status': '409'}, content='')
+
+ self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_nonexistent_subscription(self, mock_service):
+ response = self.pubsub_hook.create_subscription(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION)
+
+ create_method = (
+ mock_service.return_value.projects.return_value.subscriptions.
+ return_value.create)
+ expected_body = {
+ 'topic': EXPANDED_TOPIC,
+ 'ackDeadlineSeconds': 10
+ }
+ create_method.assert_called_with(name=EXPANDED_SUBSCRIPTION,
+ body=expected_body)
+ create_method.return_value.execute.assert_called_with()
+ self.assertEquals(TEST_SUBSCRIPTION, response)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_subscription_different_project_topic(self, mock_service):
+ response = self.pubsub_hook.create_subscription(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, 'a-different-project')
+
+ create_method = (
+ mock_service.return_value.projects.return_value.subscriptions.
+ return_value.create)
+
+ expected_subscription = 'projects/%s/subscriptions/%s' % (
+ 'a-different-project', TEST_SUBSCRIPTION)
+ expected_body = {
+ 'topic': EXPANDED_TOPIC,
+ 'ackDeadlineSeconds': 10
+ }
+ create_method.assert_called_with(name=expected_subscription,
+ body=expected_body)
+ create_method.return_value.execute.assert_called_with()
+ self.assertEquals(TEST_SUBSCRIPTION, response)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_delete_subscription(self, mock_service):
+ self.pubsub_hook.delete_subscription(TEST_PROJECT, TEST_SUBSCRIPTION)
+
+ delete_method = (mock_service.return_value.projects
+ .return_value.subscriptions.return_value.delete)
+ delete_method.assert_called_with(subscription=EXPANDED_SUBSCRIPTION)
+ delete_method.return_value.execute.assert_called_with()
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_delete_nonexisting_subscription_failifnotexists(self,
+ mock_service):
+ (mock_service.return_value.projects.return_value.subscriptions.
+ return_value.delete.return_value.execute.side_effect) = HttpError(
+ resp={'status': '404'}, content='')
+
+ with self.assertRaises(PubSubException) as e:
+ self.pubsub_hook.delete_subscription(
+ TEST_PROJECT, TEST_SUBSCRIPTION, fail_if_not_exists=True)
+
+ self.assertEquals(e.exception.message,
+ 'Subscription does not exist: %s' %
+ EXPANDED_SUBSCRIPTION)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ @mock.patch(PUBSUB_STRING.format('uuid4'),
+ new_callable=mock.Mock(return_value=lambda: TEST_UUID))
+ def test_create_subscription_without_name(self, mock_uuid, mock_service):
+ response = self.pubsub_hook.create_subscription(TEST_PROJECT,
+ TEST_TOPIC)
+ create_method = (
+ mock_service.return_value.projects.return_value.subscriptions.
+ return_value.create)
+ expected_body = {
+ 'topic': EXPANDED_TOPIC,
+ 'ackDeadlineSeconds': 10
+ }
+ expected_name = EXPANDED_SUBSCRIPTION.replace(
+ TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID)
+ create_method.assert_called_with(name=expected_name,
+ body=expected_body)
+ create_method.return_value.execute.assert_called_with()
+ self.assertEquals('sub-%s' % TEST_UUID, response)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_subscription_with_ack_deadline(self, mock_service):
+ response = self.pubsub_hook.create_subscription(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, ack_deadline_secs=30)
+
+ create_method = (
+ mock_service.return_value.projects.return_value.subscriptions.
+ return_value.create)
+ expected_body = {
+ 'topic': EXPANDED_TOPIC,
+ 'ackDeadlineSeconds': 30
+ }
+ create_method.assert_called_with(name=EXPANDED_SUBSCRIPTION,
+ body=expected_body)
+ create_method.return_value.execute.assert_called_with()
+ self.assertEquals(TEST_SUBSCRIPTION, response)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_subscription_failifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.
+ subscriptions.return_value.create.return_value
+ .execute.side_effect) = HttpError(resp={'status': '409'},
+ content='')
+
+ with self.assertRaises(PubSubException) as e:
+ self.pubsub_hook.create_subscription(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION,
+ fail_if_exists=True)
+
+ self.assertEquals(e.exception.message,
+ 'Subscription already exists: %s' %
+ EXPANDED_SUBSCRIPTION)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_create_subscription_nofailifexists(self, mock_service):
+ (mock_service.return_value.projects.return_value.topics.return_value
+ .get.return_value.execute.side_effect) = HttpError(
+ resp={'status': '409'}, content='')
+
+ response = self.pubsub_hook.create_subscription(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION
+ )
+ self.assertEquals(TEST_SUBSCRIPTION, response)
+
+ @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+ def test_publish(self, mock_service):
+ self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
+
+ publish_method = (mock_service.return_value.projects.return_value
+ .topics.return_value.publish)
+ publish_method.assert_called_with(
+ topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/pubsub_operator.py b/tests/contrib/operators/pubsub_operator.py
deleted file mode 100644
index a52bbc6..0000000
--- a/tests/contrib/operators/pubsub_operator.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from base64 import b64encode as b64e
-import unittest
-
-from airflow.contrib.operators.pubsub_operator import PubSubPublishOperator
-from airflow.contrib.operators.pubsub_operator import PubSubTopicCreateOperator
-
-try:
- from unittest import mock
-except ImportError:
- try:
- import mock
- except ImportError:
- mock = None
-
-TASK_ID = 'test-task-id'
-TEST_PROJECT = 'test-project'
-TEST_TOPIC = 'test-topic'
-TEST_MESSAGES = [
- {
- 'data': b64e('Hello, World!'),
- 'attributes': {'type': 'greeting'}
- },
- {'data': b64e('Knock, knock')},
- {'attributes': {'foo': ''}}]
-
-
-class PubSubTopicCreateOperatorTest(unittest.TestCase):
-
- @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
- def test_failifexists(self, mock_hook):
- operator = PubSubTopicCreateOperator(task_id=TASK_ID,
- project=TEST_PROJECT,
- topic=TEST_TOPIC,
- fail_if_exists=True)
-
- operator.execute(None)
- mock_hook.return_value.create_topic.assert_called_once_with(
- TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
-
- @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
- def test_succeedifexists(self, mock_hook):
- operator = PubSubTopicCreateOperator(task_id=TASK_ID,
- project=TEST_PROJECT,
- topic=TEST_TOPIC,
- fail_if_exists=False)
-
- operator.execute(None)
- mock_hook.return_value.create_topic.assert_called_once_with(
- TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
-
-
-class PubSubPublishOperatorTest(unittest.TestCase):
-
- @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
- def test_publish(self, mock_hook):
- operator = PubSubPublishOperator(task_id=TASK_ID,
- project=TEST_PROJECT,
- topic=TEST_TOPIC,
- messages=TEST_MESSAGES)
-
- operator.execute(None)
- mock_hook.return_value.publish.assert_called_once_with(
- TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/operators/test_pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_pubsub_operator.py b/tests/contrib/operators/test_pubsub_operator.py
new file mode 100644
index 0000000..d288378
--- /dev/null
+++ b/tests/contrib/operators/test_pubsub_operator.py
@@ -0,0 +1,140 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from base64 import b64encode as b64e
+import unittest
+
+from airflow.contrib.operators.pubsub_operator import (
+ PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
+ PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
+ PubSubPublishOperator)
+
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+TASK_ID = 'test-task-id'
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_SUBSCRIPTION = 'test-subscription'
+TEST_MESSAGES = [
+ {
+ 'data': b64e('Hello, World!'),
+ 'attributes': {'type': 'greeting'}
+ },
+ {'data': b64e('Knock, knock')},
+ {'attributes': {'foo': ''}}]
+
+
+class PubSubTopicCreateOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_failifexists(self, mock_hook):
+ operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ fail_if_exists=True)
+
+ operator.execute(None)
+ mock_hook.return_value.create_topic.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_succeedifexists(self, mock_hook):
+ operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ fail_if_exists=False)
+
+ operator.execute(None)
+ mock_hook.return_value.create_topic.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
+
+
+class PubSubTopicDeleteOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_execute(self, mock_hook):
+ operator = PubSubTopicDeleteOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC)
+
+ operator.execute(None)
+ mock_hook.return_value.delete_topic.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, fail_if_not_exists=False)
+
+
+class PubSubSubscriptionCreateOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_execute(self, mock_hook):
+ operator = PubSubSubscriptionCreateOperator(
+ task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC,
+ subscription=TEST_SUBSCRIPTION)
+ operator.execute(None)
+ mock_hook.return_value.create_subscription.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, None,
+ 10, False)
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_execute_different_project_ids(self, mock_hook):
+ another_project = 'another-project'
+ operator = PubSubSubscriptionCreateOperator(
+ task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC,
+ subscription=TEST_SUBSCRIPTION,
+ subscription_project=another_project)
+ operator.execute(None)
+ mock_hook.return_value.create_subscription.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, another_project,
+ 10, False)
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_execute_no_subscription(self, mock_hook):
+ operator = PubSubSubscriptionCreateOperator(
+ task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC)
+ operator.execute(None)
+ mock_hook.return_value.create_subscription.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, None, None, 10, False)
+
+
+class PubSubSubscriptionDeleteOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_execute(self, mock_hook):
+ operator = PubSubSubscriptionDeleteOperator(
+ task_id=TASK_ID, project=TEST_PROJECT,
+ subscription=TEST_SUBSCRIPTION)
+
+ operator.execute(None)
+ mock_hook.return_value.delete_subscription.assert_called_once_with(
+ TEST_PROJECT, TEST_SUBSCRIPTION, fail_if_not_exists=False)
+
+
+class PubSubPublishOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+ def test_publish(self, mock_hook):
+ operator = PubSubPublishOperator(task_id=TASK_ID,
+ project=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ messages=TEST_MESSAGES)
+
+ operator.execute(None)
+ mock_hook.return_value.publish.assert_called_once_with(
+ TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)