You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:32:54 UTC

[airflow] 30/41: Support google-cloud-monitoring>=2.0.0 (#13769)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit acfe4ae7988476f79cd63f21d3fd8de4336daa42
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Tue Feb 2 07:01:55 2021 +0100

    Support google-cloud-monitoring>=2.0.0 (#13769)
    
    (cherry picked from commit d2efb33239d36e58fb69066fd23779724cb11a90)
---
 airflow/providers/google/ADDITIONAL_INFO.md        |   1 +
 .../cloud/example_dags/example_stackdriver.py      |  82 +++++--
 .../providers/google/cloud/hooks/stackdriver.py    | 133 +++++------
 .../google/cloud/operators/stackdriver.py          |  12 +-
 setup.py                                           |   2 +-
 .../google/cloud/hooks/test_stackdriver.py         | 242 +++++++++++----------
 .../google/cloud/operators/test_stackdriver.py     |  49 ++++-
 7 files changed, 302 insertions(+), 219 deletions(-)

diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
index 16a6683..9cf9853 100644
--- a/airflow/providers/google/ADDITIONAL_INFO.md
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
 | [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
 | [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
 | [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
+| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
 | [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
 | [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
 | [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) |
diff --git a/airflow/providers/google/cloud/example_dags/example_stackdriver.py b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
index 68ac978..9c418b7 100644
--- a/airflow/providers/google/cloud/example_dags/example_stackdriver.py
+++ b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
@@ -21,6 +21,7 @@ Example Airflow DAG for Google Cloud Stackdriver service.
 """
 
 import json
+import os
 
 from airflow import models
 from airflow.providers.google.cloud.operators.stackdriver import (
@@ -37,56 +38,80 @@ from airflow.providers.google.cloud.operators.stackdriver import (
 )
 from airflow.utils.dates import days_ago
 
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
 TEST_ALERT_POLICY_1 = {
     "combiner": "OR",
-    "name": "projects/sd-project/alertPolicies/12345",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": True,
-    "displayName": "test alert 1",
+    "display_name": "test alert 1",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
+                "filter": (
+                    'metric.label.state="blocked" AND '
+                    'metric.type="agent.googleapis.com/processes/count_by_state" '
+                    'AND resource.type="gce_instance"'
+                ),
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "threshold_value": 100,
+                "duration": {'seconds': 900},
+                "trigger": {"percent": 0},
+                "aggregations": [
+                    {
+                        "alignment_period": {'seconds': 60},
+                        "per_series_aligner": "ALIGN_MEAN",
+                        "cross_series_reducer": "REDUCE_MEAN",
+                        "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
+                    }
+                ],
             },
-            "displayName": "Condition display",
-            "name": "projects/sd-project/alertPolicies/123/conditions/456",
+            "display_name": "test_alert_policy_1",
         }
     ],
 }
 
 TEST_ALERT_POLICY_2 = {
     "combiner": "OR",
-    "name": "projects/sd-project/alertPolicies/6789",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": False,
-    "displayName": "test alert 2",
+    "display_name": "test alert 2",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
+                "filter": (
+                    'metric.label.state="blocked" AND '
+                    'metric.type="agent.googleapis.com/processes/count_by_state" AND '
+                    'resource.type="gce_instance"'
+                ),
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "threshold_value": 100,
+                "duration": {'seconds': 900},
+                "trigger": {"percent": 0},
+                "aggregations": [
+                    {
+                        "alignment_period": {'seconds': 60},
+                        "per_series_aligner": "ALIGN_MEAN",
+                        "cross_series_reducer": "REDUCE_MEAN",
+                        "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
+                    }
+                ],
             },
-            "displayName": "Condition display",
-            "name": "projects/sd-project/alertPolicies/456/conditions/789",
+            "display_name": "test_alert_policy_2",
         }
     ],
 }
 
 TEST_NOTIFICATION_CHANNEL_1 = {
-    "displayName": "channel1",
+    "display_name": "channel1",
     "enabled": True,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
-    "name": "projects/sd-project/notificationChannels/12345",
-    "type": "slack",
+    "type_": "slack",
 }
 
 TEST_NOTIFICATION_CHANNEL_2 = {
-    "displayName": "channel2",
+    "display_name": "channel2",
     "enabled": False,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
-    "name": "projects/sd-project/notificationChannels/6789",
-    "type": "slack",
+    "type_": "slack",
 }
 
 with models.DAG(
@@ -150,18 +175,29 @@ with models.DAG(
     # [START howto_operator_gcp_stackdriver_delete_notification_channel]
     delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
         task_id='delete-notification-channel',
-        name='test-channel',
+        name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
     )
     # [END howto_operator_gcp_stackdriver_delete_notification_channel]
 
+    delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
+        task_id='delete-notification-channel-2',
+        name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
+    )
+
     # [START howto_operator_gcp_stackdriver_delete_alert_policy]
     delete_alert_policy = StackdriverDeleteAlertOperator(
         task_id='delete-alert-policy',
-        name='test-alert',
+        name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
     )
     # [END howto_operator_gcp_stackdriver_delete_alert_policy]
 
+    delete_alert_policy_2 = StackdriverDeleteAlertOperator(
+        task_id='delete-alert-policy-2',
+        name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
+    )
+
     create_notification_channel >> enable_notification_channel >> disable_notification_channel
     disable_notification_channel >> list_notification_channel >> create_alert_policy
     create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
-    list_alert_policies >> delete_notification_channel >> delete_alert_policy
+    list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
+    delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py b/airflow/providers/google/cloud/hooks/stackdriver.py
index 9da1afa..04dc329 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -24,7 +24,8 @@ from typing import Any, Optional, Sequence, Union
 from google.api_core.exceptions import InvalidArgument
 from google.api_core.gapic_v1.method import DEFAULT
 from google.cloud import monitoring_v3
-from google.protobuf.json_format import MessageToDict, MessageToJson, Parse
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
+from google.protobuf.field_mask_pb2 import FieldMask
 from googleapiclient.errors import HttpError
 
 from airflow.exceptions import AirflowException
@@ -110,18 +111,20 @@ class StackdriverHook(GoogleBaseHook):
         """
         client = self._get_policy_client()
         policies_ = client.list_alert_policies(
-            name=f'projects/{project_id}',
-            filter_=filter_,
-            order_by=order_by,
-            page_size=page_size,
+            request={
+                'name': f'projects/{project_id}',
+                'filter': filter_,
+                'order_by': order_by,
+                'page_size': page_size,
+            },
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         if format_ == "dict":
-            return [MessageToDict(policy) for policy in policies_]
+            return [AlertPolicy.to_dict(policy) for policy in policies_]
         elif format_ == "json":
-            return [MessageToJson(policy) for policy in policies_]
+            return [AlertPolicy.to_jsoon(policy) for policy in policies_]
         else:
             return policies_
 
@@ -138,12 +141,14 @@ class StackdriverHook(GoogleBaseHook):
         client = self._get_policy_client()
         policies_ = self.list_alert_policies(project_id=project_id, filter_=filter_)
         for policy in policies_:
-            if policy.enabled.value != bool(new_state):
-                policy.enabled.value = bool(new_state)
-                mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-                mask.paths.append('enabled')  # pylint: disable=no-member
+            if policy.enabled != bool(new_state):
+                policy.enabled = bool(new_state)
+                mask = FieldMask(paths=['enabled'])
                 client.update_alert_policy(
-                    alert_policy=policy, update_mask=mask, retry=retry, timeout=timeout, metadata=metadata
+                    request={'alert_policy': policy, 'update_mask': mask},
+                    retry=retry,
+                    timeout=timeout,
+                    metadata=metadata or (),
                 )
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -265,40 +270,39 @@ class StackdriverHook(GoogleBaseHook):
         ]
         policies_ = []
         channels = []
-
-        for channel in record["channels"]:
-            channel_json = json.dumps(channel)
-            channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
-        for policy in record["policies"]:
-            policy_json = json.dumps(policy)
-            policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
+        for channel in record.get("channels", []):
+            channels.append(NotificationChannel(**channel))
+        for policy in record.get("policies", []):
+            policies_.append(AlertPolicy(**policy))
 
         channel_name_map = {}
 
         for channel in channels:
             channel.verification_status = (
-                monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
+                monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
             )
 
             if channel.name in existing_channels:
                 channel_client.update_notification_channel(
-                    notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
+                    request={'notification_channel': channel},
+                    retry=retry,
+                    timeout=timeout,
+                    metadata=metadata or (),
                 )
             else:
                 old_name = channel.name
-                channel.ClearField('name')
+                channel.name = None
                 new_channel = channel_client.create_notification_channel(
-                    name=f'projects/{project_id}',
-                    notification_channel=channel,
+                    request={'name': f'projects/{project_id}', 'notification_channel': channel},
                     retry=retry,
                     timeout=timeout,
-                    metadata=metadata,
+                    metadata=metadata or (),
                 )
                 channel_name_map[old_name] = new_channel.name
 
         for policy in policies_:
-            policy.ClearField('creation_record')
-            policy.ClearField('mutation_record')
+            policy.creation_record = None
+            policy.mutation_record = None
 
             for i, channel in enumerate(policy.notification_channels):
                 new_channel = channel_name_map.get(channel)
@@ -308,20 +312,22 @@ class StackdriverHook(GoogleBaseHook):
             if policy.name in existing_policies:
                 try:
                     policy_client.update_alert_policy(
-                        alert_policy=policy, retry=retry, timeout=timeout, metadata=metadata
+                        request={'alert_policy': policy},
+                        retry=retry,
+                        timeout=timeout,
+                        metadata=metadata or (),
                     )
                 except InvalidArgument:
                     pass
             else:
-                policy.ClearField('name')
+                policy.name = None
                 for condition in policy.conditions:
-                    condition.ClearField('name')
+                    condition.name = None
                 policy_client.create_alert_policy(
-                    name=f'projects/{project_id}',
-                    alert_policy=policy,
+                    request={'name': f'projects/{project_id}', 'alert_policy': policy},
                     retry=retry,
                     timeout=timeout,
-                    metadata=None,
+                    metadata=metadata or (),
                 )
 
     def delete_alert_policy(
@@ -349,7 +355,9 @@ class StackdriverHook(GoogleBaseHook):
         """
         policy_client = self._get_policy_client()
         try:
-            policy_client.delete_alert_policy(name=name, retry=retry, timeout=timeout, metadata=metadata)
+            policy_client.delete_alert_policy(
+                request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
+            )
         except HttpError as err:
             raise AirflowException(f'Delete alerting policy failed. Error was {err.content}')
 
@@ -405,18 +413,20 @@ class StackdriverHook(GoogleBaseHook):
         """
         client = self._get_channel_client()
         channels = client.list_notification_channels(
-            name=f'projects/{project_id}',
-            filter_=filter_,
-            order_by=order_by,
-            page_size=page_size,
+            request={
+                'name': f'projects/{project_id}',
+                'filter': filter_,
+                'order_by': order_by,
+                'page_size': page_size,
+            },
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         if format_ == "dict":
-            return [MessageToDict(channel) for channel in channels]
+            return [NotificationChannel.to_dict(channel) for channel in channels]
         elif format_ == "json":
-            return [MessageToJson(channel) for channel in channels]
+            return [NotificationChannel.to_json(channel) for channel in channels]
         else:
             return channels
 
@@ -431,18 +441,18 @@ class StackdriverHook(GoogleBaseHook):
         metadata: Optional[str] = None,
     ) -> None:
         client = self._get_channel_client()
-        channels = client.list_notification_channels(name=f'projects/{project_id}', filter_=filter_)
+        channels = client.list_notification_channels(
+            request={'name': f'projects/{project_id}', 'filter': filter_}
+        )
         for channel in channels:
-            if channel.enabled.value != bool(new_state):
-                channel.enabled.value = bool(new_state)
-                mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-                mask.paths.append('enabled')  # pylint: disable=no-member
+            if channel.enabled != bool(new_state):
+                channel.enabled = bool(new_state)
+                mask = FieldMask(paths=['enabled'])
                 client.update_notification_channel(
-                    notification_channel=channel,
-                    update_mask=mask,
+                    request={'notification_channel': channel, 'update_mask': mask},
                     retry=retry,
                     timeout=timeout,
-                    metadata=metadata,
+                    metadata=metadata or (),
                 )
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -518,7 +528,7 @@ class StackdriverHook(GoogleBaseHook):
             new_state=False,
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -562,29 +572,28 @@ class StackdriverHook(GoogleBaseHook):
         channel_name_map = {}
 
         for channel in record["channels"]:
-            channel_json = json.dumps(channel)
-            channels_list.append(
-                Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel())
-            )
+            channels_list.append(NotificationChannel(**channel))
 
         for channel in channels_list:
             channel.verification_status = (
-                monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
+                monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
             )
 
             if channel.name in existing_channels:
                 channel_client.update_notification_channel(
-                    notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
+                    request={'notification_channel': channel},
+                    retry=retry,
+                    timeout=timeout,
+                    metadata=metadata or (),
                 )
             else:
                 old_name = channel.name
-                channel.ClearField('name')
+                channel.name = None
                 new_channel = channel_client.create_notification_channel(
-                    name=f'projects/{project_id}',
-                    notification_channel=channel,
+                    request={'name': f'projects/{project_id}', 'notification_channel': channel},
                     retry=retry,
                     timeout=timeout,
-                    metadata=metadata,
+                    metadata=metadata or (),
                 )
                 channel_name_map[old_name] = new_channel.name
 
@@ -616,7 +625,7 @@ class StackdriverHook(GoogleBaseHook):
         channel_client = self._get_channel_client()
         try:
             channel_client.delete_notification_channel(
-                name=name, retry=retry, timeout=timeout, metadata=metadata
+                request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
             )
         except HttpError as err:
             raise AirflowException(f'Delete notification channel failed. Error was {err.content}')
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index dc86466..7289b12 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -19,6 +19,7 @@
 from typing import Optional, Sequence, Union
 
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
@@ -125,7 +126,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
 
     def execute(self, context):
         self.log.info(
-            'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+            'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
             self.project_id,
             self.format_,
             self.filter_,
@@ -139,7 +140,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
                 impersonation_chain=self.impersonation_chain,
             )
 
-        return self.hook.list_alert_policies(
+        result = self.hook.list_alert_policies(
             project_id=self.project_id,
             format_=self.format_,
             filter_=self.filter_,
@@ -149,6 +150,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        return [AlertPolicy.to_dict(policy) for policy in result]
 
 
 class StackdriverEnableAlertPoliciesOperator(BaseOperator):
@@ -614,7 +616,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
 
     def execute(self, context):
         self.log.info(
-            'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+            'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
             self.project_id,
             self.format_,
             self.filter_,
@@ -627,7 +629,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
                 delegate_to=self.delegate_to,
                 impersonation_chain=self.impersonation_chain,
             )
-        return self.hook.list_notification_channels(
+        channels = self.hook.list_notification_channels(
             format_=self.format_,
             project_id=self.project_id,
             filter_=self.filter_,
@@ -637,6 +639,8 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        result = [NotificationChannel.to_dict(channel) for channel in channels]
+        return result
 
 
 class StackdriverEnableNotificationChannelsOperator(BaseOperator):
diff --git a/setup.py b/setup.py
index 0f40d88..fa1e73a 100644
--- a/setup.py
+++ b/setup.py
@@ -294,7 +294,7 @@ google = [
     'google-cloud-language>=1.1.1,<2.0.0',
     'google-cloud-logging>=1.14.0,<2.0.0',
     'google-cloud-memcache>=0.2.0',
-    'google-cloud-monitoring>=0.34.0,<2.0.0',
+    'google-cloud-monitoring>=2.0.0,<3.0.0',
     'google-cloud-os-login>=2.0.0,<3.0.0',
     'google-cloud-pubsub>=2.0.0,<3.0.0',
     'google-cloud-redis>=2.0.0,<3.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_stackdriver.py b/tests/providers/google/cloud/hooks/test_stackdriver.py
index 6892d05..10a3097 100644
--- a/tests/providers/google/cloud/hooks/test_stackdriver.py
+++ b/tests/providers/google/cloud/hooks/test_stackdriver.py
@@ -21,8 +21,8 @@ import unittest
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud import monitoring_v3
-from google.protobuf.json_format import ParseDict
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.providers.google.cloud.hooks import stackdriver
 
@@ -32,16 +32,15 @@ TEST_FILTER = "filter"
 TEST_ALERT_POLICY_1 = {
     "combiner": "OR",
     "name": "projects/sd-project/alertPolicies/12345",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": True,
-    "displayName": "test display",
+    "display_name": "test display",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
             },
-            "displayName": "Condition display",
+            "display_name": "Condition display",
             "name": "projects/sd-project/alertPolicies/123/conditions/456",
         }
     ],
@@ -50,35 +49,34 @@ TEST_ALERT_POLICY_1 = {
 TEST_ALERT_POLICY_2 = {
     "combiner": "OR",
     "name": "projects/sd-project/alertPolicies/6789",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": False,
-    "displayName": "test display",
+    "display_name": "test display",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
             },
-            "displayName": "Condition display",
+            "display_name": "Condition display",
             "name": "projects/sd-project/alertPolicies/456/conditions/789",
         }
     ],
 }
 
 TEST_NOTIFICATION_CHANNEL_1 = {
-    "displayName": "sd",
+    "display_name": "sd",
     "enabled": True,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
     "name": "projects/sd-project/notificationChannels/12345",
-    "type": "slack",
+    "type_": "slack",
 }
 
 TEST_NOTIFICATION_CHANNEL_2 = {
-    "displayName": "sd",
+    "display_name": "sd",
     "enabled": False,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
     "name": "projects/sd-project/notificationChannels/6789",
-    "type": "slack",
+    "type_": "slack",
 }
 
 
@@ -96,13 +94,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         method.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=TEST_FILTER,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            order_by=None,
-            page_size=None,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -113,8 +108,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
     def test_stackdriver_enable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
         hook = stackdriver.StackdriverHook()
 
-        alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
-        alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+        alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1)
+        alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2)
 
         alert_policies = [alert_policy_enabled, alert_policy_disabled]
 
@@ -124,23 +119,18 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=TEST_FILTER,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            order_by=None,
-            page_size=None,
-            metadata=None,
+            metadata=(),
         )
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-        alert_policy_disabled.enabled.value = True  # pylint: disable=no-member
-        mask.paths.append('enabled')  # pylint: disable=no-member
+        mask = FieldMask(paths=["enabled"])
+        alert_policy_disabled.enabled = True  # pylint: disable=no-member
         mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
-            alert_policy=alert_policy_disabled,
-            update_mask=mask,
+            request=dict(alert_policy=alert_policy_disabled, update_mask=mask),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -150,8 +140,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
     def test_stackdriver_disable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
         hook = stackdriver.StackdriverHook()
-        alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
-        alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+        alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1)
+        alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2)
 
         mock_policy_client.return_value.list_alert_policies.return_value = [
             alert_policy_enabled,
@@ -162,23 +152,18 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=TEST_FILTER,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            order_by=None,
-            page_size=None,
-            metadata=None,
+            metadata=(),
         )
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-        alert_policy_enabled.enabled.value = False  # pylint: disable=no-member
-        mask.paths.append('enabled')  # pylint: disable=no-member
+        mask = FieldMask(paths=["enabled"])
+        alert_policy_enabled.enabled = False  # pylint: disable=no-member
         mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
-            alert_policy=alert_policy_enabled,
-            update_mask=mask,
+            request=dict(alert_policy=alert_policy_enabled, update_mask=mask),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -191,8 +176,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
         self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
     ):
         hook = stackdriver.StackdriverHook()
-        existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
-        alert_policy_to_create = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+        existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1)
+        alert_policy_to_create = AlertPolicy(**TEST_ALERT_POLICY_2)
 
         mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
         mock_channel_client.return_value.list_notification_channels.return_value = []
@@ -202,38 +187,77 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=None,
+            request=dict(
+                name=f'projects/{PROJECT_ID}',
+                filter=None,
+                order_by=None,
+                page_size=None,
+            ),
             retry=DEFAULT,
             timeout=DEFAULT,
-            order_by=None,
-            page_size=None,
-            metadata=None,
+            metadata=(),
         )
         mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=None,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            order_by=None,
-            page_size=None,
-            metadata=None,
+            metadata=(),
         )
-        alert_policy_to_create.ClearField('name')
-        alert_policy_to_create.ClearField('creation_record')
-        alert_policy_to_create.ClearField('mutation_record')
-        alert_policy_to_create.conditions[0].ClearField('name')  # pylint: disable=no-member
+        alert_policy_to_create.name = None
+        alert_policy_to_create.creation_record = None
+        alert_policy_to_create.mutation_record = None
+        alert_policy_to_create.conditions[0].name = None
         mock_policy_client.return_value.create_alert_policy.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            alert_policy=alert_policy_to_create,
+            request=dict(
+                name=f'projects/{PROJECT_ID}',
+                alert_policy=alert_policy_to_create,
+            ),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
-        existing_alert_policy.ClearField('creation_record')
-        existing_alert_policy.ClearField('mutation_record')
+        existing_alert_policy.creation_record = None
+        existing_alert_policy.mutation_record = None
         mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
-            alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
+            request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=()
+        )
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
+        return_value=(CREDENTIALS, PROJECT_ID),
+    )
+    @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
+    @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
+    def test_stackdriver_upsert_alert_policy_without_channel(
+        self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
+    ):
+        hook = stackdriver.StackdriverHook()
+        existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1)
+
+        mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
+        mock_channel_client.return_value.list_notification_channels.return_value = []
+
+        hook.upsert_alert(
+            alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
+            project_id=PROJECT_ID,
+        )
+        mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
+            request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
+            metadata=(),
+            retry=DEFAULT,
+            timeout=DEFAULT,
+        )
+        mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
+            request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
+            retry=DEFAULT,
+            timeout=DEFAULT,
+            metadata=(),
+        )
+
+        existing_alert_policy.creation_record = None
+        existing_alert_policy.mutation_record = None
+        mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
+            request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=()
         )
 
     @mock.patch(
@@ -247,10 +271,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
             name='test-alert',
         )
         mock_policy_client.return_value.delete_alert_policy.assert_called_once_with(
-            name='test-alert',
+            request=dict(name='test-alert'),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -265,13 +289,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=TEST_FILTER,
-            order_by=None,
-            page_size=None,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -283,12 +304,9 @@ class TestStackdriverHookMethods(unittest.TestCase):
         self, mock_channel_client, mock_get_creds_and_project_id
     ):
         hook = stackdriver.StackdriverHook()
-        notification_channel_enabled = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
-        notification_channel_disabled = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
+        notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+        notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
+
         mock_channel_client.return_value.list_notification_channels.return_value = [
             notification_channel_enabled,
             notification_channel_disabled,
@@ -299,15 +317,13 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
 
-        notification_channel_disabled.enabled.value = True  # pylint: disable=no-member
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-        mask.paths.append('enabled')  # pylint: disable=no-member
+        notification_channel_disabled.enabled = True  # pylint: disable=no-member
+        mask = FieldMask(paths=['enabled'])
         mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
-            notification_channel=notification_channel_disabled,
-            update_mask=mask,
+            request=dict(notification_channel=notification_channel_disabled, update_mask=mask),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -319,12 +335,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
         self, mock_channel_client, mock_get_creds_and_project_id
     ):
         hook = stackdriver.StackdriverHook()
-        notification_channel_enabled = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
-        notification_channel_disabled = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
+        notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+        notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
         mock_channel_client.return_value.list_notification_channels.return_value = [
             notification_channel_enabled,
             notification_channel_disabled,
@@ -335,15 +347,13 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
 
-        notification_channel_enabled.enabled.value = False  # pylint: disable=no-member
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
-        mask.paths.append('enabled')  # pylint: disable=no-member
+        notification_channel_enabled.enabled = False  # pylint: disable=no-member
+        mask = FieldMask(paths=['enabled'])
         mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
-            notification_channel=notification_channel_enabled,
-            update_mask=mask,
+            request=dict(notification_channel=notification_channel_enabled, update_mask=mask),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -353,12 +363,9 @@ class TestStackdriverHookMethods(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
     def test_stackdriver_upsert_channel(self, mock_channel_client, mock_get_creds_and_project_id):
         hook = stackdriver.StackdriverHook()
-        existing_notification_channel = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
-        notification_channel_to_be_created = ParseDict(
-            TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
-        )
+        existing_notification_channel = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+        notification_channel_to_be_created = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
+
         mock_channel_client.return_value.list_notification_channels.return_value = [
             existing_notification_channel
         ]
@@ -367,24 +374,25 @@ class TestStackdriverHookMethods(unittest.TestCase):
             project_id=PROJECT_ID,
         )
         mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            filter_=None,
-            order_by=None,
-            page_size=None,
+            request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
         mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
-            notification_channel=existing_notification_channel, retry=DEFAULT, timeout=DEFAULT, metadata=None
+            request=dict(notification_channel=existing_notification_channel),
+            retry=DEFAULT,
+            timeout=DEFAULT,
+            metadata=(),
         )
-        notification_channel_to_be_created.ClearField('name')
+        notification_channel_to_be_created.name = None
         mock_channel_client.return_value.create_notification_channel.assert_called_once_with(
-            name=f'projects/{PROJECT_ID}',
-            notification_channel=notification_channel_to_be_created,
+            request=dict(
+                name=f'projects/{PROJECT_ID}', notification_channel=notification_channel_to_be_created
+            ),
             retry=DEFAULT,
             timeout=DEFAULT,
-            metadata=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -400,5 +408,5 @@ class TestStackdriverHookMethods(unittest.TestCase):
             name='test-channel',
         )
         mock_channel_client.return_value.delete_notification_channel.assert_called_once_with(
-            name='test-channel', retry=DEFAULT, timeout=DEFAULT, metadata=None
+            request=dict(name='test-channel'), retry=DEFAULT, timeout=DEFAULT, metadata=()
         )
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 28901b4..50dd997 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -21,6 +21,7 @@ import unittest
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
 
 from airflow.providers.google.cloud.operators.stackdriver import (
     StackdriverDeleteAlertOperator,
@@ -40,16 +41,15 @@ TEST_FILTER = 'filter'
 TEST_ALERT_POLICY_1 = {
     "combiner": "OR",
     "name": "projects/sd-project/alertPolicies/12345",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": True,
-    "displayName": "test display",
+    "display_name": "test display",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "aggregations": [{"alignment_eriod": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
             },
-            "displayName": "Condition display",
+            "display_name": "Condition display",
             "name": "projects/sd-project/alertPolicies/123/conditions/456",
         }
     ],
@@ -58,16 +58,15 @@ TEST_ALERT_POLICY_1 = {
 TEST_ALERT_POLICY_2 = {
     "combiner": "OR",
     "name": "projects/sd-project/alertPolicies/6789",
-    "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": False,
-    "displayName": "test display",
+    "display_name": "test display",
     "conditions": [
         {
-            "conditionThreshold": {
+            "condition_threshold": {
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
             },
-            "displayName": "Condition display",
+            "display_name": "Condition display",
             "name": "projects/sd-project/alertPolicies/456/conditions/789",
         }
     ],
@@ -94,7 +93,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
     def test_execute(self, mock_hook):
         operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
-        operator.execute(None)
+        mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
+        result = operator.execute(None)
         mock_hook.return_value.list_alert_policies.assert_called_once_with(
             project_id=None,
             filter_=TEST_FILTER,
@@ -105,6 +105,16 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
+        assert [
+            {
+                'combiner': 0,
+                'conditions': [],
+                'display_name': '',
+                'name': 'test-name',
+                'notification_channels': [],
+                'user_labels': {},
+            }
+        ] == result
 
 
 class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
@@ -160,7 +170,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
     def test_execute(self, mock_hook):
         operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
-        operator.execute(None)
+        mock_hook.return_value.list_notification_channels.return_value = [
+            NotificationChannel(name="test-123")
+        ]
+
+        result = operator.execute(None)
         mock_hook.return_value.list_notification_channels.assert_called_once_with(
             project_id=None,
             filter_=TEST_FILTER,
@@ -171,6 +185,17 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
+        assert [
+            {
+                'description': '',
+                'display_name': '',
+                'labels': {},
+                'name': 'test-123',
+                'type_': '',
+                'user_labels': {},
+                'verification_status': 0,
+            }
+        ] == result
 
 
 class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):