You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/18 22:54:09 UTC

incubator-airflow git commit: [AIRFLOW-1913] Add new GCP PubSub operators

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 16b5f9a19 -> 8942d2e84


[AIRFLOW-1913] Add new GCP PubSub operators

Closes #2872 from prodonjs/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8942d2e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8942d2e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8942d2e8

Branch: refs/heads/master
Commit: 8942d2e848b0df1b4c7868451f6815af5ae0fa8b
Parents: 16b5f9a
Author: Jason Prodonovich <pr...@google.com>
Authored: Mon Dec 18 14:53:30 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Dec 18 14:53:40 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_pubsub_hook.py        | 143 +++++++-
 airflow/contrib/operators/pubsub_operator.py    | 344 ++++++++++++++++---
 docs/code.rst                                   |   7 +-
 tests/contrib/hooks/gcp_pubsub_hook.py          | 102 ------
 tests/contrib/hooks/test_gcp_pubsub_hook.py     | 242 +++++++++++++
 tests/contrib/operators/pubsub_operator.py      |  77 -----
 tests/contrib/operators/test_pubsub_operator.py | 140 ++++++++
 7 files changed, 821 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/airflow/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_pubsub_hook.py b/airflow/contrib/hooks/gcp_pubsub_hook.py
index 529d121..dc95d89 100644
--- a/airflow/contrib/hooks/gcp_pubsub_hook.py
+++ b/airflow/contrib/hooks/gcp_pubsub_hook.py
@@ -12,16 +12,26 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from uuid import uuid4
+
 from apiclient.discovery import build
 from apiclient import errors
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 
+def _format_subscription(project, subscription):
+    return 'projects/%s/subscriptions/%s' % (project, subscription)
+
+
 def _format_topic(project, topic):
     return 'projects/%s/topics/%s' % (project, topic)
 
 
+class PubSubException(Exception):
+    pass
+
+
 class PubSubHook(GoogleCloudBaseHook):
     """Hook for accessing Google Pub/Sub.
 
@@ -29,9 +39,7 @@ class PubSubHook(GoogleCloudBaseHook):
     the project embedded in the Connection referenced by gcp_conn_id.
     """
 
-    def __init__(self,
-                 gcp_conn_id='google_cloud_default',
-                 delegate_to=None):
+    def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
         super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)
 
     def get_conn(self):
@@ -45,10 +53,10 @@ class PubSubHook(GoogleCloudBaseHook):
     def publish(self, project, topic, messages):
         """Publishes messages to a Pub/Sub topic.
 
-        :param project: the GCP project name or ID in which to publish
+        :param project: the GCP project ID in which to publish
         :type project: string
         :param topic: the Pub/Sub topic to which to publish; do not
-            include the 'projects/{project}/topics/' prefix.
+            include the ``projects/{project}/topics/`` prefix.
         :type topic: string
         :param messages: messages to publish; if the data field in a
             message is set, it should already be base64 encoded.
@@ -62,16 +70,17 @@ class PubSubHook(GoogleCloudBaseHook):
         try:
             request.execute()
         except errors.HttpError as e:
-            raise Exception('Error publishing to topic %s' % full_topic, e)
+            raise PubSubException(
+                'Error publishing to topic %s' % full_topic, e)
 
     def create_topic(self, project, topic, fail_if_exists=False):
         """Creates a Pub/Sub topic, if it does not already exist.
 
-        :param project: the GCP project name or ID in which to create
+        :param project: the GCP project ID in which to create
             the topic
         :type project: string
         :param topic: the Pub/Sub topic name to create; do not
-            include the 'projects/{project}/topics/' prefix.
+            include the ``projects/{project}/topics/`` prefix.
         :type topic: string
         :param fail_if_exists: if set, raise an exception if the topic
             already exists
@@ -85,9 +94,119 @@ class PubSubHook(GoogleCloudBaseHook):
         except errors.HttpError as e:
             # Status code 409 indicates that the topic already exists.
             if str(e.resp['status']) == '409':
+                message = 'Topic already exists: %s' % full_topic
+                self.log.warning(message)
+                if fail_if_exists:
+                    raise PubSubException(message)
+            else:
+                raise PubSubException('Error creating topic %s' % full_topic, e)
+
+    def delete_topic(self, project, topic, fail_if_not_exists=False):
+        """Deletes a Pub/Sub topic if it exists.
+
+        :param project: the GCP project ID in which to delete the topic
+        :type project: string
+        :param topic: the Pub/Sub topic name to delete; do not
+            include the ``projects/{project}/topics/`` prefix.
+        :type topic: string
+        :param fail_if_not_exists: if set, raise an exception if the topic
+            does not exist
+        :type fail_if_not_exists: bool
+        """
+        service = self.get_conn()
+        full_topic = _format_topic(project, topic)
+        try:
+            service.projects().topics().delete(topic=full_topic).execute()
+        except errors.HttpError as e:
+            # Status code 409 indicates that the topic was not found
+            if str(e.resp['status']) == '404':
+                message = 'Topic does not exist: %s' % full_topic
+                self.log.warning(message)
+                if fail_if_not_exists:
+                    raise PubSubException(message)
+            else:
+                raise PubSubException('Error deleting topic %s' % full_topic, e)
+
+    def create_subscription(self, topic_project, topic, subscription=None,
+                            subscription_project=None, ack_deadline_secs=10,
+                            fail_if_exists=False):
+        """Creates a Pub/Sub subscription, if it does not already exist.
+
+        :param topic_project: the GCP project ID of the topic that the
+            subscription will be bound to.
+        :type topic_project: string
+        :param topic: the Pub/Sub topic name that the subscription will be bound
+            to create; do not include the ``projects/{project}/subscriptions/``
+            prefix.
+        :type topic: string
+        :param subscription: the Pub/Sub subscription name. If empty, a random
+            name will be generated using the uuid module
+        :type subscription: string
+        :param subscription_project: the GCP project ID where the subscription
+            will be created. If unspecified, ``topic_project`` will be used.
+        :type subscription_project: string
+        :param ack_deadline_secs: Number of seconds that a subscriber has to
+            acknowledge each message pulled from the subscription
+        :type ack_deadline_secs: int
+        :param fail_if_exists: if set, raise an exception if the topic
+            already exists
+        :type fail_if_exists: bool
+        :return: subscription name which will be the system-generated value if
+            the ``subscription`` parameter is not supplied
+        :rtype: string
+        """
+        service = self.get_conn()
+        full_topic = _format_topic(topic_project, topic)
+        if not subscription:
+            subscription = 'sub-%s' % uuid4()
+        if not subscription_project:
+            subscription_project = topic_project
+        full_subscription = _format_subscription(subscription_project,
+                                                 subscription)
+        body = {
+            'topic': full_topic,
+            'ackDeadlineSeconds': ack_deadline_secs
+        }
+        try:
+            service.projects().subscriptions().create(
+                name=full_subscription, body=body).execute()
+        except errors.HttpError as e:
+            # Status code 409 indicates that the subscription already exists.
+            if str(e.resp['status']) == '409':
+                message = 'Subscription already exists: %s' % full_subscription
+                self.log.warning(message)
                 if fail_if_exists:
-                    raise Exception(
-                        'Error creating topic. Topic already exists: %s'
-                        % full_topic)
+                    raise PubSubException(message)
+            else:
+                raise PubSubException(
+                    'Error creating subscription %s' % full_subscription, e)
+        return subscription
+
+    def delete_subscription(self, project, subscription,
+                            fail_if_not_exists=False):
+        """Deletes a Pub/Sub subscription, if it exists.
+
+        :param project: the GCP project ID where the subscription exists
+        :type project: string
+        :param subscription: the Pub/Sub subscription name to delete; do not
+            include the ``projects/{project}/subscriptions/`` prefix.
+        :type subscription: string
+        :param fail_if_not_exists: if set, raise an exception if the topic
+            does not exist
+        :type fail_if_not_exists: bool
+        """
+        service = self.get_conn()
+        full_subscription = _format_subscription(project, subscription)
+        try:
+            service.projects().subscriptions().delete(
+                subscription=full_subscription).execute()
+        except errors.HttpError as e:
+            # Status code 404 indicates that the subscription was not found
+            if str(e.resp['status']) == '404':
+                message = 'Subscription does not exist: %s' % full_subscription
+                self.log.warning(message)
+                if fail_if_not_exists:
+                    raise PubSubException(message)
             else:
-                raise Exception('Error creating topic %s' % full_topic, e)
+                raise PubSubException('Error deleting subscription %s' %
+                                      full_subscription, e)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
index f68c6a9..7793787 100644
--- a/airflow/contrib/operators/pubsub_operator.py
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -21,26 +21,28 @@ class PubSubTopicCreateOperator(BaseOperator):
     """Create a PubSub topic.
 
     By default, if the topic already exists, this operator will
-    not cause the DAG to fail.
-    ```
-    with DAG('successful DAG') as dag:
-        (
-            dag
-            >> PubSubTopicCreateOperator(topic='my_new_topic')
-            >> PubSubTopicCreateOperator(topic='my_new_topic')
-        )
-    ```
-
-    The operator can be configured to fail if the topic already exists.
-    ```
-    with DAG('failing DAG') as dag:
-        (
-            dag
-            >> PubSubTopicCreateOperator(topic='my_new_topic')
-            >> PubSubTopicCreateOperator(topic='my_new_topic',
-                                         fail_if_exists=True)
-        )
-    ```
+    not cause the DAG to fail. ::
+
+        with DAG('successful DAG') as dag:
+            (
+                dag
+                >> PubSubTopicCreateOperator(project='my-project',
+                                             topic='my_new_topic')
+                >> PubSubTopicCreateOperator(project='my-project',
+                                             topic='my_new_topic')
+            )
+
+    The operator can be configured to fail if the topic already exists. ::
+
+        with DAG('failing DAG') as dag:
+            (
+                dag
+                >> PubSubTopicCreateOperator(project='my-project',
+                                             topic='my_new_topic')
+                >> PubSubTopicCreateOperator(project='my-project',
+                                             topic='my_new_topic',
+                                             fail_if_exists=True)
+            )
 
     Both ``project`` and ``topic`` are templated so you can use
     variables in them.
@@ -59,8 +61,7 @@ class PubSubTopicCreateOperator(BaseOperator):
             *args,
             **kwargs):
         """
-        :param project: the GCP project name or ID in which to work
-            (templated)
+        :param project: the GCP project ID where the topic will be created
         :type project: string
         :param topic: the topic to create. Do not include the
             full topic path. In other words, instead of
@@ -91,29 +92,289 @@ class PubSubTopicCreateOperator(BaseOperator):
                           fail_if_exists=self.fail_if_exists)
 
 
+class PubSubSubscriptionCreateOperator(BaseOperator):
+    """Create a PubSub subscription.
+
+    By default, the subscription will be created in ``topic_project``. If
+    ``subscription_project`` is specified and the GCP credentials allow, the
+    Subscription can be created in a different project from its topic.
+
+    By default, if the subscription already exists, this operator will
+    not cause the DAG to fail. However, the topic must exist in the project. ::
+
+        with DAG('successful DAG') as dag:
+            (
+                dag
+                >> PubSubSubscriptionCreateOperator(
+                    topic_project='my-project', topic='my-topic',
+                    subscription='my-subscription')
+                >> PubSubSubscriptionCreateOperator(
+                    topic_project='my-project', topic='my-topic',
+                    subscription='my-subscription')
+            )
+
+    The operator can be configured to fail if the subscription already exists.
+    ::
+
+        with DAG('failing DAG') as dag:
+            (
+                dag
+                >> PubSubSubscriptionCreateOperator(
+                    topic_project='my-project', topic='my-topic',
+                    subscription='my-subscription')
+                >> PubSubSubscriptionCreateOperator(
+                    topic_project='my-project', topic='my-topic',
+                    subscription='my-subscription', fail_if_exists=True)
+            )
+
+    Finally, subscription is not required. If not passed, the operator will
+    generated a universally unique identifier for the subscription's name. ::
+
+        with DAG('DAG') as dag:
+            (
+                dag >> PubSubSubscriptionCreateOperator(
+                    topic_project='my-project', topic='my-topic')
+            )
+
+    ``topic_project``, ``topic``, ``subscription``, and
+    ``subscription`` are templated so you can use variables in them.
+    """
+    template_fields = ['topic_project', 'topic', 'subscription',
+                       'subscription_project']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            topic_project,
+            topic,
+            subscription=None,
+            subscription_project=None,
+            ack_deadline_secs=10,
+            fail_if_exists=False,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        :param topic_project: the GCP project ID where the topic exists
+        :type topic_project: string
+        :param topic: the topic to create. Do not include the
+            full topic path. In other words, instead of
+            ``projects/{project}/topics/{topic}``, provide only
+            ``{topic}``. (templated)
+        :type topic: string
+        :param subscription: the Pub/Sub subscription name. If empty, a random
+            name will be generated using the uuid module
+        :type subscription: string
+        :param subscription_project: the GCP project ID where the subscription
+            will be created. If empty, ``topic_project`` will be used.
+        :type subscription_project: string
+        :param ack_deadline_secs: Number of seconds that a subscriber has to
+            acknowledge each message pulled from the subscription
+        :type ack_deadline_secs: int
+        :param gcp_conn_id: The connection ID to use connecting to
+            Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request
+            must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(PubSubSubscriptionCreateOperator, self).__init__(*args, **kwargs)
+
+        self.topic_project = topic_project
+        self.topic = topic
+        self.subscription = subscription
+        self.subscription_project = subscription_project
+        self.ack_deadline_secs = ack_deadline_secs
+        self.fail_if_exists = fail_if_exists
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+                          delegate_to=self.delegate_to)
+
+        hook.create_subscription(
+            self.topic_project, self.topic, self.subscription,
+            self.subscription_project, self.ack_deadline_secs,
+            self.fail_if_exists)
+
+
+class PubSubTopicDeleteOperator(BaseOperator):
+    """Delete a PubSub topic.
+
+    By default, if the topic does not exist, this operator will
+    not cause the DAG to fail. ::
+
+        with DAG('successful DAG') as dag:
+            (
+                dag
+                >> PubSubTopicDeleteOperator(project='my-project',
+                                             topic='non_existing_topic')
+            )
+
+    The operator can be configured to fail if the topic does not exist. ::
+
+        with DAG('failing DAG') as dag:
+            (
+                dag
+                >> PubSubTopicCreateOperator(project='my-project',
+                                             topic='non_existing_topic',
+                                             fail_if_not_exists=True)
+            )
+
+    Both ``project`` and ``topic`` are templated so you can use
+    variables in them.
+    """
+    template_fields = ['project', 'topic']
+    ui_color = '#cb4335'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project,
+            topic,
+            fail_if_not_exists=False,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        :param project: the GCP project ID in which to work (templated)
+        :type project: string
+        :param topic: the topic to delete. Do not include the
+            full topic path. In other words, instead of
+            ``projects/{project}/topics/{topic}``, provide only
+            ``{topic}``. (templated)
+        :type topic: string
+        :param fail_if_not_exists: If True and the topic does not exist, fail
+            the task
+        :type fail_if_not_exists: bool
+        :param gcp_conn_id: The connection ID to use connecting to
+            Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request
+            must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(PubSubTopicDeleteOperator, self).__init__(*args, **kwargs)
+
+        self.project = project
+        self.topic = topic
+        self.fail_if_not_exists = fail_if_not_exists
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+                          delegate_to=self.delegate_to)
+
+        hook.delete_topic(self.project, self.topic,
+                          fail_if_not_exists=self.fail_if_not_exists)
+
+
+class PubSubSubscriptionDeleteOperator(BaseOperator):
+    """Delete a PubSub subscription.
+
+    By default, if the subscription does not exist, this operator will
+    not cause the DAG to fail. ::
+
+        with DAG('successful DAG') as dag:
+            (
+                dag
+                >> PubSubSubscriptionDeleteOperator(project='my-project',
+                                                    subscription='non-existing')
+            )
+
+    The operator can be configured to fail if the subscription already exists.
+
+    ::
+
+        with DAG('failing DAG') as dag:
+            (
+                dag
+                >> PubSubSubscriptionDeleteOperator(
+                     project='my-project', subscription='non-existing',
+                     fail_if_not_exists=True)
+            )
+
+    ``project``, and ``subscription`` are templated so you can use
+    variables in them.
+    """
+    template_fields = ['project', 'subscription']
+    ui_color = '#cb4335'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project,
+            subscription,
+            fail_if_not_exists=False,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        :param project: the GCP project ID in which to work (templated)
+        :type project: string
+        :param subscription: the subscription to delete. Do not include the
+            full subscription path. In other words, instead of
+            ``projects/{project}/subscription/{subscription}``, provide only
+            ``{subscription}``. (templated)
+        :type subscription: string
+        :param fail_if_not_exists: If True and the subscription does not exist,
+            fail the task
+        :type fail_if_not_exists: bool
+        :param gcp_conn_id: The connection ID to use connecting to
+            Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request
+            must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(PubSubSubscriptionDeleteOperator, self).__init__(*args, **kwargs)
+
+        self.project = project
+        self.subscription = subscription
+        self.fail_if_not_exists = fail_if_not_exists
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = PubSubHook(gcp_conn_id=self.gcp_conn_id,
+                          delegate_to=self.delegate_to)
+
+        hook.delete_subscription(self.project, self.subscription,
+                                 fail_if_not_exists=self.fail_if_not_exists)
+
+
 class PubSubPublishOperator(BaseOperator):
     """Publish messages to a PubSub topic.
 
     Each Task publishes all provided messages to the same topic
     in a single GCP project. If the topic does not exist, this
-    task will fail.
-
-    ```
-    from base64 import b64encode as b64e
-
-    m1 = {'data': b64e('Hello, World!'),
-          'attributes': {'type': 'greeting'}
-         }
-    m2 = {'data': b64e('Knock, knock')}
-    m3 = {'attributes': {'foo': ''}}
-
-    t1 = PubSubPublishOperator(
-        topic='my_topic',
-        messages=[m1, m2, m3],
-        create_topic=True,
-        dag=dag)
-    ```
-    Both ``project`` and ``topic`` are templated so you can use
+    task will fail. ::
+
+        from base64 import b64encode as b64e
+
+        m1 = {'data': b64e('Hello, World!'),
+              'attributes': {'type': 'greeting'}
+             }
+        m2 = {'data': b64e('Knock, knock')}
+        m3 = {'attributes': {'foo': ''}}
+
+        t1 = PubSubPublishOperator(
+            project='my-project',
+            topic='my_topic',
+            messages=[m1, m2, m3],
+            create_topic=True,
+            dag=dag)
+
+    ``project``, ``topic``, and ``messages`` are templated so you can use
     variables in them.
     """
     template_fields = ['project', 'topic', 'messages']
@@ -130,8 +391,7 @@ class PubSubPublishOperator(BaseOperator):
             *args,
             **kwargs):
         """
-        :param project: the GCP project name or ID in which to work
-            (templated)
+        :param project: the GCP project ID in which to work (templated)
         :type project: string
         :param topic: the topic to which to publish. Do not include the
             full topic path. In other words, instead of

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 1369b32..021a05e 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -102,6 +102,10 @@ Community-contributed Operators
 .. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
 .. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
 .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator
+.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
 .. autoclass:: airflow.contrib.operators.QuboleOperator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
@@ -241,7 +245,8 @@ Community contributed hooks
         VerticaHook,
         FTPHook,
         SSHHook,
-        CloudantHook
+        CloudantHook,
+        PubSubHook
 
 .. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/hooks/gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_pubsub_hook.py b/tests/contrib/hooks/gcp_pubsub_hook.py
deleted file mode 100644
index 9572c33..0000000
--- a/tests/contrib/hooks/gcp_pubsub_hook.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from base64 import b64encode as b64e
-import unittest
-
-from apiclient.errors import HttpError
-
-from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
-
-try:
-    from unittest import mock
-except ImportError:
-    try:
-        import mock
-    except ImportError:
-        mock = None
-
-
-BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
-PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
-
-TEST_PROJECT = 'test-project'
-TEST_TOPIC = 'test-topic'
-TEST_MESSAGES = [
-    {
-        'data': b64e('Hello, World!'),
-        'attributes': {'type': 'greeting'}
-    },
-    {'data': b64e('Knock, knock')},
-    {'attributes': {'foo': ''}}]
-
-EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
-
-
-def mock_init(self, gcp_conn_id, delegate_to=None):
-    pass
-
-
-class PubSubHookTest(unittest.TestCase):
-    def setUp(self):
-        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
-                        new=mock_init):
-            self.pubsub_hook = PubSubHook(gcp_conn_id='test')
-
-    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
-    def test_create_nonexistent_topic(self, mock_service):
-        self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
-
-        create_method = (mock_service.return_value.projects.return_value.topics
-                         .return_value.create)
-        create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
-        create_method.return_value.execute.assert_called_with()
-
-    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
-    def test_create_preexisting_topic_failifexists(self, mock_service):
-        (mock_service.return_value.projects.return_value.topics.return_value
-         .create.return_value.execute.side_effect) = HttpError(
-            resp={'status': '409'}, content='')
-
-        try:
-            self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
-                                          fail_if_exists=True)
-        except Exception:
-            pass  # Expected.
-        else:
-            self.fail('Topic creation should fail for existing topic when '
-                      'fail_if_exists=True')
-
-    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
-    def test_create_preexisting_topic_nofailifexists(self, mock_service):
-        (mock_service.return_value.projects.return_value.topics.return_value
-         .get.return_value.execute.side_effect) = HttpError(
-            resp={'status': '409'}, content='')
-
-        try:
-            self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC,
-                                          fail_if_exists=False)
-        except Exception:
-            self.fail('Topic creation should not fail for existing topic when '
-                      'fail_if_exists=False')
-
-    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
-    def test_publish(self, mock_service):
-        self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
-
-        publish_method = (mock_service.return_value.projects.return_value
-                          .topics.return_value.publish)
-        publish_method.assert_called_with(
-            topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/hooks/test_gcp_pubsub_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_gcp_pubsub_hook.py b/tests/contrib/hooks/test_gcp_pubsub_hook.py
new file mode 100644
index 0000000..8cc6c8f
--- /dev/null
+++ b/tests/contrib/hooks/test_gcp_pubsub_hook.py
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from base64 import b64encode as b64e
+import unittest
+
+from apiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_pubsub_hook import PubSubException, PubSubHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+PUBSUB_STRING = 'airflow.contrib.hooks.gcp_pubsub_hook.{}'
+
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_SUBSCRIPTION = 'test-subscription'
+TEST_UUID = 'abc123-xzy789'
+TEST_MESSAGES = [
+    {
+        'data': b64e('Hello, World!'),
+        'attributes': {'type': 'greeting'}
+    },
+    {'data': b64e('Knock, knock')},
+    {'attributes': {'foo': ''}}]
+
+EXPANDED_TOPIC = 'projects/%s/topics/%s' % (TEST_PROJECT, TEST_TOPIC)
+EXPANDED_SUBSCRIPTION = 'projects/%s/subscriptions/%s' % (TEST_PROJECT,
+                                                          TEST_SUBSCRIPTION)
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+    pass
+
+
+class PubSubHookTest(unittest.TestCase):
+    def setUp(self):
+        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+                        new=mock_init):
+            self.pubsub_hook = PubSubHook(gcp_conn_id='test')
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_nonexistent_topic(self, mock_service):
+        self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+        create_method = (mock_service.return_value.projects.return_value.topics
+                         .return_value.create)
+        create_method.assert_called_with(body={}, name=EXPANDED_TOPIC)
+        create_method.return_value.execute.assert_called_with()
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_delete_topic(self, mock_service):
+        self.pubsub_hook.delete_topic(TEST_PROJECT, TEST_TOPIC)
+
+        delete_method = (mock_service.return_value.projects.return_value.topics
+                         .return_value.delete)
+        delete_method.assert_called_with(topic=EXPANDED_TOPIC)
+        delete_method.return_value.execute.assert_called_with()
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_delete_nonexisting_topic_failifnotexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics
+         .return_value.delete.return_value.execute.side_effect) = HttpError(
+            resp={'status': '404'}, content='')
+
+        with self.assertRaises(PubSubException) as e:
+            self.pubsub_hook.delete_topic(TEST_PROJECT, TEST_TOPIC, True)
+
+        self.assertEquals(e.exception.message,
+                          'Topic does not exist: %s' % EXPANDED_TOPIC)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_preexisting_topic_failifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics.return_value
+         .create.return_value.execute.side_effect) = HttpError(
+            resp={'status': '409'}, content='')
+
+        with self.assertRaises(PubSubException) as e:
+            self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC, True)
+        self.assertEquals(e.exception.message,
+                          'Topic already exists: %s' % EXPANDED_TOPIC)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_preexisting_topic_nofailifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics.return_value
+         .get.return_value.execute.side_effect) = HttpError(
+            resp={'status': '409'}, content='')
+
+        self.pubsub_hook.create_topic(TEST_PROJECT, TEST_TOPIC)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_nonexistent_subscription(self, mock_service):
+        response = self.pubsub_hook.create_subscription(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION)
+
+        create_method = (
+            mock_service.return_value.projects.return_value.subscriptions.
+            return_value.create)
+        expected_body = {
+            'topic': EXPANDED_TOPIC,
+            'ackDeadlineSeconds': 10
+        }
+        create_method.assert_called_with(name=EXPANDED_SUBSCRIPTION,
+                                         body=expected_body)
+        create_method.return_value.execute.assert_called_with()
+        self.assertEquals(TEST_SUBSCRIPTION, response)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_subscription_different_project_topic(self, mock_service):
+        response = self.pubsub_hook.create_subscription(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, 'a-different-project')
+
+        create_method = (
+            mock_service.return_value.projects.return_value.subscriptions.
+            return_value.create)
+
+        expected_subscription = 'projects/%s/subscriptions/%s' % (
+            'a-different-project', TEST_SUBSCRIPTION)
+        expected_body = {
+            'topic': EXPANDED_TOPIC,
+            'ackDeadlineSeconds': 10
+        }
+        create_method.assert_called_with(name=expected_subscription,
+                                         body=expected_body)
+        create_method.return_value.execute.assert_called_with()
+        self.assertEquals(TEST_SUBSCRIPTION, response)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_delete_subscription(self, mock_service):
+        self.pubsub_hook.delete_subscription(TEST_PROJECT, TEST_SUBSCRIPTION)
+
+        delete_method = (mock_service.return_value.projects
+                         .return_value.subscriptions.return_value.delete)
+        delete_method.assert_called_with(subscription=EXPANDED_SUBSCRIPTION)
+        delete_method.return_value.execute.assert_called_with()
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_delete_nonexisting_subscription_failifnotexists(self,
+                                                             mock_service):
+        (mock_service.return_value.projects.return_value.subscriptions.
+         return_value.delete.return_value.execute.side_effect) = HttpError(
+            resp={'status': '404'}, content='')
+
+        with self.assertRaises(PubSubException) as e:
+            self.pubsub_hook.delete_subscription(
+                TEST_PROJECT, TEST_SUBSCRIPTION, fail_if_not_exists=True)
+
+        self.assertEquals(e.exception.message,
+                          'Subscription does not exist: %s' %
+                          EXPANDED_SUBSCRIPTION)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    @mock.patch(PUBSUB_STRING.format('uuid4'),
+                new_callable=mock.Mock(return_value=lambda: TEST_UUID))
+    def test_create_subscription_without_name(self, mock_uuid, mock_service):
+        response = self.pubsub_hook.create_subscription(TEST_PROJECT,
+                                                        TEST_TOPIC)
+        create_method = (
+            mock_service.return_value.projects.return_value.subscriptions.
+            return_value.create)
+        expected_body = {
+            'topic': EXPANDED_TOPIC,
+            'ackDeadlineSeconds': 10
+        }
+        expected_name = EXPANDED_SUBSCRIPTION.replace(
+            TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID)
+        create_method.assert_called_with(name=expected_name,
+                                         body=expected_body)
+        create_method.return_value.execute.assert_called_with()
+        self.assertEquals('sub-%s' % TEST_UUID, response)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_subscription_with_ack_deadline(self, mock_service):
+        response = self.pubsub_hook.create_subscription(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, ack_deadline_secs=30)
+
+        create_method = (
+            mock_service.return_value.projects.return_value.subscriptions.
+            return_value.create)
+        expected_body = {
+            'topic': EXPANDED_TOPIC,
+            'ackDeadlineSeconds': 30
+        }
+        create_method.assert_called_with(name=EXPANDED_SUBSCRIPTION,
+                                         body=expected_body)
+        create_method.return_value.execute.assert_called_with()
+        self.assertEquals(TEST_SUBSCRIPTION, response)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_subscription_failifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.
+         subscriptions.return_value.create.return_value
+         .execute.side_effect) = HttpError(resp={'status': '409'},
+                                           content='')
+
+        with self.assertRaises(PubSubException) as e:
+            self.pubsub_hook.create_subscription(
+                TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION,
+                fail_if_exists=True)
+
+        self.assertEquals(e.exception.message,
+                          'Subscription already exists: %s' %
+                          EXPANDED_SUBSCRIPTION)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_create_subscription_nofailifexists(self, mock_service):
+        (mock_service.return_value.projects.return_value.topics.return_value
+         .get.return_value.execute.side_effect) = HttpError(
+            resp={'status': '409'}, content='')
+
+        response = self.pubsub_hook.create_subscription(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION
+        )
+        self.assertEquals(TEST_SUBSCRIPTION, response)
+
+    @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
+    def test_publish(self, mock_service):
+        self.pubsub_hook.publish(TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)
+
+        publish_method = (mock_service.return_value.projects.return_value
+                          .topics.return_value.publish)
+        publish_method.assert_called_with(
+            topic=EXPANDED_TOPIC, body={'messages': TEST_MESSAGES})

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/pubsub_operator.py b/tests/contrib/operators/pubsub_operator.py
deleted file mode 100644
index a52bbc6..0000000
--- a/tests/contrib/operators/pubsub_operator.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from base64 import b64encode as b64e
-import unittest
-
-from airflow.contrib.operators.pubsub_operator import PubSubPublishOperator
-from airflow.contrib.operators.pubsub_operator import PubSubTopicCreateOperator
-
-try:
-    from unittest import mock
-except ImportError:
-    try:
-        import mock
-    except ImportError:
-        mock = None
-
-TASK_ID = 'test-task-id'
-TEST_PROJECT = 'test-project'
-TEST_TOPIC = 'test-topic'
-TEST_MESSAGES = [
-    {
-        'data': b64e('Hello, World!'),
-        'attributes': {'type': 'greeting'}
-    },
-    {'data': b64e('Knock, knock')},
-    {'attributes': {'foo': ''}}]
-
-
-class PubSubTopicCreateOperatorTest(unittest.TestCase):
-
-    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
-    def test_failifexists(self, mock_hook):
-        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
-                                             project=TEST_PROJECT,
-                                             topic=TEST_TOPIC,
-                                             fail_if_exists=True)
-
-        operator.execute(None)
-        mock_hook.return_value.create_topic.assert_called_once_with(
-            TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
-
-    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
-    def test_succeedifexists(self, mock_hook):
-        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
-                                             project=TEST_PROJECT,
-                                             topic=TEST_TOPIC,
-                                             fail_if_exists=False)
-
-        operator.execute(None)
-        mock_hook.return_value.create_topic.assert_called_once_with(
-            TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
-
-
-class PubSubPublishOperatorTest(unittest.TestCase):
-
-    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
-    def test_publish(self, mock_hook):
-        operator = PubSubPublishOperator(task_id=TASK_ID,
-                                         project=TEST_PROJECT,
-                                         topic=TEST_TOPIC,
-                                         messages=TEST_MESSAGES)
-
-        operator.execute(None)
-        mock_hook.return_value.publish.assert_called_once_with(
-            TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8942d2e8/tests/contrib/operators/test_pubsub_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_pubsub_operator.py b/tests/contrib/operators/test_pubsub_operator.py
new file mode 100644
index 0000000..d288378
--- /dev/null
+++ b/tests/contrib/operators/test_pubsub_operator.py
@@ -0,0 +1,140 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from base64 import b64encode as b64e
+import unittest
+
+from airflow.contrib.operators.pubsub_operator import (
+    PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
+    PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
+    PubSubPublishOperator)
+
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-task-id'
+TEST_PROJECT = 'test-project'
+TEST_TOPIC = 'test-topic'
+TEST_SUBSCRIPTION = 'test-subscription'
+TEST_MESSAGES = [
+    {
+        'data': b64e('Hello, World!'),
+        'attributes': {'type': 'greeting'}
+    },
+    {'data': b64e('Knock, knock')},
+    {'attributes': {'foo': ''}}]
+
+
+class PubSubTopicCreateOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_failifexists(self, mock_hook):
+        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+                                             project=TEST_PROJECT,
+                                             topic=TEST_TOPIC,
+                                             fail_if_exists=True)
+
+        operator.execute(None)
+        mock_hook.return_value.create_topic.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, fail_if_exists=True)
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_succeedifexists(self, mock_hook):
+        operator = PubSubTopicCreateOperator(task_id=TASK_ID,
+                                             project=TEST_PROJECT,
+                                             topic=TEST_TOPIC,
+                                             fail_if_exists=False)
+
+        operator.execute(None)
+        mock_hook.return_value.create_topic.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, fail_if_exists=False)
+
+
+class PubSubTopicDeleteOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_execute(self, mock_hook):
+        operator = PubSubTopicDeleteOperator(task_id=TASK_ID,
+                                             project=TEST_PROJECT,
+                                             topic=TEST_TOPIC)
+
+        operator.execute(None)
+        mock_hook.return_value.delete_topic.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, fail_if_not_exists=False)
+
+
+class PubSubSubscriptionCreateOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_execute(self, mock_hook):
+        operator = PubSubSubscriptionCreateOperator(
+            task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC,
+            subscription=TEST_SUBSCRIPTION)
+        operator.execute(None)
+        mock_hook.return_value.create_subscription.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, None,
+            10, False)
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_execute_different_project_ids(self, mock_hook):
+        another_project = 'another-project'
+        operator = PubSubSubscriptionCreateOperator(
+            task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC,
+            subscription=TEST_SUBSCRIPTION,
+            subscription_project=another_project)
+        operator.execute(None)
+        mock_hook.return_value.create_subscription.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, TEST_SUBSCRIPTION, another_project,
+            10, False)
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_execute_no_subscription(self, mock_hook):
+        operator = PubSubSubscriptionCreateOperator(
+            task_id=TASK_ID, topic_project=TEST_PROJECT, topic=TEST_TOPIC)
+        operator.execute(None)
+        mock_hook.return_value.create_subscription.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, None, None, 10, False)
+
+
+class PubSubSubscriptionDeleteOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_execute(self, mock_hook):
+        operator = PubSubSubscriptionDeleteOperator(
+            task_id=TASK_ID, project=TEST_PROJECT,
+            subscription=TEST_SUBSCRIPTION)
+
+        operator.execute(None)
+        mock_hook.return_value.delete_subscription.assert_called_once_with(
+            TEST_PROJECT, TEST_SUBSCRIPTION, fail_if_not_exists=False)
+
+
+class PubSubPublishOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.pubsub_operator.PubSubHook')
+    def test_publish(self, mock_hook):
+        operator = PubSubPublishOperator(task_id=TASK_ID,
+                                         project=TEST_PROJECT,
+                                         topic=TEST_TOPIC,
+                                         messages=TEST_MESSAGES)
+
+        operator.execute(None)
+        mock_hook.return_value.publish.assert_called_once_with(
+            TEST_PROJECT, TEST_TOPIC, TEST_MESSAGES)