You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:32:54 UTC
[airflow] 30/41: Support google-cloud-monitoring>=2.0.0 (#13769)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit acfe4ae7988476f79cd63f21d3fd8de4336daa42
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Tue Feb 2 07:01:55 2021 +0100
Support google-cloud-monitoring>=2.0.0 (#13769)
(cherry picked from commit d2efb33239d36e58fb69066fd23779724cb11a90)
---
airflow/providers/google/ADDITIONAL_INFO.md | 1 +
.../cloud/example_dags/example_stackdriver.py | 82 +++++--
.../providers/google/cloud/hooks/stackdriver.py | 133 +++++------
.../google/cloud/operators/stackdriver.py | 12 +-
setup.py | 2 +-
.../google/cloud/hooks/test_stackdriver.py | 242 +++++++++++----------
.../google/cloud/operators/test_stackdriver.py | 49 ++++-
7 files changed, 302 insertions(+), 219 deletions(-)
diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
index 16a6683..9cf9853 100644
--- a/airflow/providers/google/ADDITIONAL_INFO.md
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
+| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
| [``google-cloud-tasks``](https://pypi.org/project/google-cloud-tasks/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-tasks/blob/master/UPGRADING.md) |
diff --git a/airflow/providers/google/cloud/example_dags/example_stackdriver.py b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
index 68ac978..9c418b7 100644
--- a/airflow/providers/google/cloud/example_dags/example_stackdriver.py
+++ b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
@@ -21,6 +21,7 @@ Example Airflow DAG for Google Cloud Stackdriver service.
"""
import json
+import os
from airflow import models
from airflow.providers.google.cloud.operators.stackdriver import (
@@ -37,56 +38,80 @@ from airflow.providers.google.cloud.operators.stackdriver import (
)
from airflow.utils.dates import days_ago
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
TEST_ALERT_POLICY_1 = {
"combiner": "OR",
- "name": "projects/sd-project/alertPolicies/12345",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
- "displayName": "test alert 1",
+ "display_name": "test alert 1",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
+ "filter": (
+ 'metric.label.state="blocked" AND '
+ 'metric.type="agent.googleapis.com/processes/count_by_state" '
+ 'AND resource.type="gce_instance"'
+ ),
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "threshold_value": 100,
+ "duration": {'seconds': 900},
+ "trigger": {"percent": 0},
+ "aggregations": [
+ {
+ "alignment_period": {'seconds': 60},
+ "per_series_aligner": "ALIGN_MEAN",
+ "cross_series_reducer": "REDUCE_MEAN",
+ "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
+ }
+ ],
},
- "displayName": "Condition display",
- "name": "projects/sd-project/alertPolicies/123/conditions/456",
+ "display_name": "test_alert_policy_1",
}
],
}
TEST_ALERT_POLICY_2 = {
"combiner": "OR",
- "name": "projects/sd-project/alertPolicies/6789",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
- "displayName": "test alert 2",
+ "display_name": "test alert 2",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
+ "filter": (
+ 'metric.label.state="blocked" AND '
+ 'metric.type="agent.googleapis.com/processes/count_by_state" AND '
+ 'resource.type="gce_instance"'
+ ),
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "threshold_value": 100,
+ "duration": {'seconds': 900},
+ "trigger": {"percent": 0},
+ "aggregations": [
+ {
+ "alignment_period": {'seconds': 60},
+ "per_series_aligner": "ALIGN_MEAN",
+ "cross_series_reducer": "REDUCE_MEAN",
+ "group_by_fields": ["project", "resource.label.instance_id", "resource.label.zone"],
+ }
+ ],
},
- "displayName": "Condition display",
- "name": "projects/sd-project/alertPolicies/456/conditions/789",
+ "display_name": "test_alert_policy_2",
}
],
}
TEST_NOTIFICATION_CHANNEL_1 = {
- "displayName": "channel1",
+ "display_name": "channel1",
"enabled": True,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
- "name": "projects/sd-project/notificationChannels/12345",
- "type": "slack",
+ "type_": "slack",
}
TEST_NOTIFICATION_CHANNEL_2 = {
- "displayName": "channel2",
+ "display_name": "channel2",
"enabled": False,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
- "name": "projects/sd-project/notificationChannels/6789",
- "type": "slack",
+ "type_": "slack",
}
with models.DAG(
@@ -150,18 +175,29 @@ with models.DAG(
# [START howto_operator_gcp_stackdriver_delete_notification_channel]
delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
task_id='delete-notification-channel',
- name='test-channel',
+ name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_notification_channel]
+ delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
+ task_id='delete-notification-channel-2',
+ name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
+ )
+
# [START howto_operator_gcp_stackdriver_delete_alert_policy]
delete_alert_policy = StackdriverDeleteAlertOperator(
task_id='delete-alert-policy',
- name='test-alert',
+ name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_alert_policy]
+ delete_alert_policy_2 = StackdriverDeleteAlertOperator(
+ task_id='delete-alert-policy-2',
+ name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
+ )
+
create_notification_channel >> enable_notification_channel >> disable_notification_channel
disable_notification_channel >> list_notification_channel >> create_alert_policy
create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
- list_alert_policies >> delete_notification_channel >> delete_alert_policy
+ list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
+ delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py b/airflow/providers/google/cloud/hooks/stackdriver.py
index 9da1afa..04dc329 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -24,7 +24,8 @@ from typing import Any, Optional, Sequence, Union
from google.api_core.exceptions import InvalidArgument
from google.api_core.gapic_v1.method import DEFAULT
from google.cloud import monitoring_v3
-from google.protobuf.json_format import MessageToDict, MessageToJson, Parse
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
+from google.protobuf.field_mask_pb2 import FieldMask
from googleapiclient.errors import HttpError
from airflow.exceptions import AirflowException
@@ -110,18 +111,20 @@ class StackdriverHook(GoogleBaseHook):
"""
client = self._get_policy_client()
policies_ = client.list_alert_policies(
- name=f'projects/{project_id}',
- filter_=filter_,
- order_by=order_by,
- page_size=page_size,
+ request={
+ 'name': f'projects/{project_id}',
+ 'filter': filter_,
+ 'order_by': order_by,
+ 'page_size': page_size,
+ },
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
if format_ == "dict":
- return [MessageToDict(policy) for policy in policies_]
+ return [AlertPolicy.to_dict(policy) for policy in policies_]
elif format_ == "json":
- return [MessageToJson(policy) for policy in policies_]
+ return [AlertPolicy.to_jsoon(policy) for policy in policies_]
else:
return policies_
@@ -138,12 +141,14 @@ class StackdriverHook(GoogleBaseHook):
client = self._get_policy_client()
policies_ = self.list_alert_policies(project_id=project_id, filter_=filter_)
for policy in policies_:
- if policy.enabled.value != bool(new_state):
- policy.enabled.value = bool(new_state)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- mask.paths.append('enabled') # pylint: disable=no-member
+ if policy.enabled != bool(new_state):
+ policy.enabled = bool(new_state)
+ mask = FieldMask(paths=['enabled'])
client.update_alert_policy(
- alert_policy=policy, update_mask=mask, retry=retry, timeout=timeout, metadata=metadata
+ request={'alert_policy': policy, 'update_mask': mask},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -265,40 +270,39 @@ class StackdriverHook(GoogleBaseHook):
]
policies_ = []
channels = []
-
- for channel in record["channels"]:
- channel_json = json.dumps(channel)
- channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
- for policy in record["policies"]:
- policy_json = json.dumps(policy)
- policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
+ for channel in record.get("channels", []):
+ channels.append(NotificationChannel(**channel))
+ for policy in record.get("policies", []):
+ policies_.append(AlertPolicy(**policy))
channel_name_map = {}
for channel in channels:
channel.verification_status = (
- monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
+ monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
)
if channel.name in existing_channels:
channel_client.update_notification_channel(
- notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
+ request={'notification_channel': channel},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata or (),
)
else:
old_name = channel.name
- channel.ClearField('name')
+ channel.name = None
new_channel = channel_client.create_notification_channel(
- name=f'projects/{project_id}',
- notification_channel=channel,
+ request={'name': f'projects/{project_id}', 'notification_channel': channel},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
channel_name_map[old_name] = new_channel.name
for policy in policies_:
- policy.ClearField('creation_record')
- policy.ClearField('mutation_record')
+ policy.creation_record = None
+ policy.mutation_record = None
for i, channel in enumerate(policy.notification_channels):
new_channel = channel_name_map.get(channel)
@@ -308,20 +312,22 @@ class StackdriverHook(GoogleBaseHook):
if policy.name in existing_policies:
try:
policy_client.update_alert_policy(
- alert_policy=policy, retry=retry, timeout=timeout, metadata=metadata
+ request={'alert_policy': policy},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata or (),
)
except InvalidArgument:
pass
else:
- policy.ClearField('name')
+ policy.name = None
for condition in policy.conditions:
- condition.ClearField('name')
+ condition.name = None
policy_client.create_alert_policy(
- name=f'projects/{project_id}',
- alert_policy=policy,
+ request={'name': f'projects/{project_id}', 'alert_policy': policy},
retry=retry,
timeout=timeout,
- metadata=None,
+ metadata=metadata or (),
)
def delete_alert_policy(
@@ -349,7 +355,9 @@ class StackdriverHook(GoogleBaseHook):
"""
policy_client = self._get_policy_client()
try:
- policy_client.delete_alert_policy(name=name, retry=retry, timeout=timeout, metadata=metadata)
+ policy_client.delete_alert_policy(
+ request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
+ )
except HttpError as err:
raise AirflowException(f'Delete alerting policy failed. Error was {err.content}')
@@ -405,18 +413,20 @@ class StackdriverHook(GoogleBaseHook):
"""
client = self._get_channel_client()
channels = client.list_notification_channels(
- name=f'projects/{project_id}',
- filter_=filter_,
- order_by=order_by,
- page_size=page_size,
+ request={
+ 'name': f'projects/{project_id}',
+ 'filter': filter_,
+ 'order_by': order_by,
+ 'page_size': page_size,
+ },
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
if format_ == "dict":
- return [MessageToDict(channel) for channel in channels]
+ return [NotificationChannel.to_dict(channel) for channel in channels]
elif format_ == "json":
- return [MessageToJson(channel) for channel in channels]
+ return [NotificationChannel.to_json(channel) for channel in channels]
else:
return channels
@@ -431,18 +441,18 @@ class StackdriverHook(GoogleBaseHook):
metadata: Optional[str] = None,
) -> None:
client = self._get_channel_client()
- channels = client.list_notification_channels(name=f'projects/{project_id}', filter_=filter_)
+ channels = client.list_notification_channels(
+ request={'name': f'projects/{project_id}', 'filter': filter_}
+ )
for channel in channels:
- if channel.enabled.value != bool(new_state):
- channel.enabled.value = bool(new_state)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- mask.paths.append('enabled') # pylint: disable=no-member
+ if channel.enabled != bool(new_state):
+ channel.enabled = bool(new_state)
+ mask = FieldMask(paths=['enabled'])
client.update_notification_channel(
- notification_channel=channel,
- update_mask=mask,
+ request={'notification_channel': channel, 'update_mask': mask},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -518,7 +528,7 @@ class StackdriverHook(GoogleBaseHook):
new_state=False,
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
@GoogleBaseHook.fallback_to_default_project_id
@@ -562,29 +572,28 @@ class StackdriverHook(GoogleBaseHook):
channel_name_map = {}
for channel in record["channels"]:
- channel_json = json.dumps(channel)
- channels_list.append(
- Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel())
- )
+ channels_list.append(NotificationChannel(**channel))
for channel in channels_list:
channel.verification_status = (
- monitoring_v3.enums.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
+ monitoring_v3.NotificationChannel.VerificationStatus.VERIFICATION_STATUS_UNSPECIFIED
)
if channel.name in existing_channels:
channel_client.update_notification_channel(
- notification_channel=channel, retry=retry, timeout=timeout, metadata=metadata
+ request={'notification_channel': channel},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata or (),
)
else:
old_name = channel.name
- channel.ClearField('name')
+ channel.name = None
new_channel = channel_client.create_notification_channel(
- name=f'projects/{project_id}',
- notification_channel=channel,
+ request={'name': f'projects/{project_id}', 'notification_channel': channel},
retry=retry,
timeout=timeout,
- metadata=metadata,
+ metadata=metadata or (),
)
channel_name_map[old_name] = new_channel.name
@@ -616,7 +625,7 @@ class StackdriverHook(GoogleBaseHook):
channel_client = self._get_channel_client()
try:
channel_client.delete_notification_channel(
- name=name, retry=retry, timeout=timeout, metadata=metadata
+ request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
except HttpError as err:
raise AirflowException(f'Delete notification channel failed. Error was {err.content}')
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index dc86466..7289b12 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -19,6 +19,7 @@
from typing import Optional, Sequence, Union
from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
@@ -125,7 +126,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
def execute(self, context):
self.log.info(
- 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+ 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
@@ -139,7 +140,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
impersonation_chain=self.impersonation_chain,
)
- return self.hook.list_alert_policies(
+ result = self.hook.list_alert_policies(
project_id=self.project_id,
format_=self.format_,
filter_=self.filter_,
@@ -149,6 +150,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ return [AlertPolicy.to_dict(policy) for policy in result]
class StackdriverEnableAlertPoliciesOperator(BaseOperator):
@@ -614,7 +616,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
def execute(self, context):
self.log.info(
- 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+ 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
@@ -627,7 +629,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- return self.hook.list_notification_channels(
+ channels = self.hook.list_notification_channels(
format_=self.format_,
project_id=self.project_id,
filter_=self.filter_,
@@ -637,6 +639,8 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ result = [NotificationChannel.to_dict(channel) for channel in channels]
+ return result
class StackdriverEnableNotificationChannelsOperator(BaseOperator):
diff --git a/setup.py b/setup.py
index 0f40d88..fa1e73a 100644
--- a/setup.py
+++ b/setup.py
@@ -294,7 +294,7 @@ google = [
'google-cloud-language>=1.1.1,<2.0.0',
'google-cloud-logging>=1.14.0,<2.0.0',
'google-cloud-memcache>=0.2.0',
- 'google-cloud-monitoring>=0.34.0,<2.0.0',
+ 'google-cloud-monitoring>=2.0.0,<3.0.0',
'google-cloud-os-login>=2.0.0,<3.0.0',
'google-cloud-pubsub>=2.0.0,<3.0.0',
'google-cloud-redis>=2.0.0,<3.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_stackdriver.py b/tests/providers/google/cloud/hooks/test_stackdriver.py
index 6892d05..10a3097 100644
--- a/tests/providers/google/cloud/hooks/test_stackdriver.py
+++ b/tests/providers/google/cloud/hooks/test_stackdriver.py
@@ -21,8 +21,8 @@ import unittest
from unittest import mock
from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud import monitoring_v3
-from google.protobuf.json_format import ParseDict
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
+from google.protobuf.field_mask_pb2 import FieldMask
from airflow.providers.google.cloud.hooks import stackdriver
@@ -32,16 +32,15 @@ TEST_FILTER = "filter"
TEST_ALERT_POLICY_1 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/12345",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
- "displayName": "test display",
+ "display_name": "test display",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
},
- "displayName": "Condition display",
+ "display_name": "Condition display",
"name": "projects/sd-project/alertPolicies/123/conditions/456",
}
],
@@ -50,35 +49,34 @@ TEST_ALERT_POLICY_1 = {
TEST_ALERT_POLICY_2 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/6789",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
- "displayName": "test display",
+ "display_name": "test display",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
},
- "displayName": "Condition display",
+ "display_name": "Condition display",
"name": "projects/sd-project/alertPolicies/456/conditions/789",
}
],
}
TEST_NOTIFICATION_CHANNEL_1 = {
- "displayName": "sd",
+ "display_name": "sd",
"enabled": True,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"name": "projects/sd-project/notificationChannels/12345",
- "type": "slack",
+ "type_": "slack",
}
TEST_NOTIFICATION_CHANNEL_2 = {
- "displayName": "sd",
+ "display_name": "sd",
"enabled": False,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
"name": "projects/sd-project/notificationChannels/6789",
- "type": "slack",
+ "type_": "slack",
}
@@ -96,13 +94,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
method.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=TEST_FILTER,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- order_by=None,
- page_size=None,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -113,8 +108,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
def test_stackdriver_enable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
hook = stackdriver.StackdriverHook()
- alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
- alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+ alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1)
+ alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2)
alert_policies = [alert_policy_enabled, alert_policy_disabled]
@@ -124,23 +119,18 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=TEST_FILTER,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- order_by=None,
- page_size=None,
- metadata=None,
+ metadata=(),
)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- alert_policy_disabled.enabled.value = True # pylint: disable=no-member
- mask.paths.append('enabled') # pylint: disable=no-member
+ mask = FieldMask(paths=["enabled"])
+ alert_policy_disabled.enabled = True # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
- alert_policy=alert_policy_disabled,
- update_mask=mask,
+ request=dict(alert_policy=alert_policy_disabled, update_mask=mask),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -150,8 +140,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
def test_stackdriver_disable_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
hook = stackdriver.StackdriverHook()
- alert_policy_enabled = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
- alert_policy_disabled = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+ alert_policy_enabled = AlertPolicy(**TEST_ALERT_POLICY_1)
+ alert_policy_disabled = AlertPolicy(**TEST_ALERT_POLICY_2)
mock_policy_client.return_value.list_alert_policies.return_value = [
alert_policy_enabled,
@@ -162,23 +152,18 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=TEST_FILTER,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- order_by=None,
- page_size=None,
- metadata=None,
+ metadata=(),
)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- alert_policy_enabled.enabled.value = False # pylint: disable=no-member
- mask.paths.append('enabled') # pylint: disable=no-member
+ mask = FieldMask(paths=["enabled"])
+ alert_policy_enabled.enabled = False # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
- alert_policy=alert_policy_enabled,
- update_mask=mask,
+ request=dict(alert_policy=alert_policy_enabled, update_mask=mask),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -191,8 +176,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
):
hook = stackdriver.StackdriverHook()
- existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
- alert_policy_to_create = ParseDict(TEST_ALERT_POLICY_2, monitoring_v3.types.alert_pb2.AlertPolicy())
+ existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1)
+ alert_policy_to_create = AlertPolicy(**TEST_ALERT_POLICY_2)
mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
mock_channel_client.return_value.list_notification_channels.return_value = []
@@ -202,38 +187,77 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=None,
+ request=dict(
+ name=f'projects/{PROJECT_ID}',
+ filter=None,
+ order_by=None,
+ page_size=None,
+ ),
retry=DEFAULT,
timeout=DEFAULT,
- order_by=None,
- page_size=None,
- metadata=None,
+ metadata=(),
)
mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=None,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- order_by=None,
- page_size=None,
- metadata=None,
+ metadata=(),
)
- alert_policy_to_create.ClearField('name')
- alert_policy_to_create.ClearField('creation_record')
- alert_policy_to_create.ClearField('mutation_record')
- alert_policy_to_create.conditions[0].ClearField('name') # pylint: disable=no-member
+ alert_policy_to_create.name = None
+ alert_policy_to_create.creation_record = None
+ alert_policy_to_create.mutation_record = None
+ alert_policy_to_create.conditions[0].name = None
mock_policy_client.return_value.create_alert_policy.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- alert_policy=alert_policy_to_create,
+ request=dict(
+ name=f'projects/{PROJECT_ID}',
+ alert_policy=alert_policy_to_create,
+ ),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
- existing_alert_policy.ClearField('creation_record')
- existing_alert_policy.ClearField('mutation_record')
+ existing_alert_policy.creation_record = None
+ existing_alert_policy.mutation_record = None
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
- alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
+ request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=()
+ )
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
+ return_value=(CREDENTIALS, PROJECT_ID),
+ )
+ @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
+ @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
+ def test_stackdriver_upsert_alert_policy_without_channel(
+ self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
+ ):
+ hook = stackdriver.StackdriverHook()
+ existing_alert_policy = AlertPolicy(**TEST_ALERT_POLICY_1)
+
+ mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
+ mock_channel_client.return_value.list_notification_channels.return_value = []
+
+ hook.upsert_alert(
+ alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
+ project_id=PROJECT_ID,
+ )
+ mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
+ request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
+ metadata=(),
+ retry=DEFAULT,
+ timeout=DEFAULT,
+ )
+ mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
+ request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
+ retry=DEFAULT,
+ timeout=DEFAULT,
+ metadata=(),
+ )
+
+ existing_alert_policy.creation_record = None
+ existing_alert_policy.mutation_record = None
+ mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
+ request=dict(alert_policy=existing_alert_policy), retry=DEFAULT, timeout=DEFAULT, metadata=()
)
@mock.patch(
@@ -247,10 +271,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
name='test-alert',
)
mock_policy_client.return_value.delete_alert_policy.assert_called_once_with(
- name='test-alert',
+ request=dict(name='test-alert'),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -265,13 +289,10 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=TEST_FILTER,
- order_by=None,
- page_size=None,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=TEST_FILTER, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -283,12 +304,9 @@ class TestStackdriverHookMethods(unittest.TestCase):
self, mock_channel_client, mock_get_creds_and_project_id
):
hook = stackdriver.StackdriverHook()
- notification_channel_enabled = ParseDict(
- TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
- notification_channel_disabled = ParseDict(
- TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
+ notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+ notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
+
mock_channel_client.return_value.list_notification_channels.return_value = [
notification_channel_enabled,
notification_channel_disabled,
@@ -299,15 +317,13 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
- notification_channel_disabled.enabled.value = True # pylint: disable=no-member
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- mask.paths.append('enabled') # pylint: disable=no-member
+ notification_channel_disabled.enabled = True # pylint: disable=no-member
+ mask = FieldMask(paths=['enabled'])
mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
- notification_channel=notification_channel_disabled,
- update_mask=mask,
+ request=dict(notification_channel=notification_channel_disabled, update_mask=mask),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -319,12 +335,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
self, mock_channel_client, mock_get_creds_and_project_id
):
hook = stackdriver.StackdriverHook()
- notification_channel_enabled = ParseDict(
- TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
- notification_channel_disabled = ParseDict(
- TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
+ notification_channel_enabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+ notification_channel_disabled = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
mock_channel_client.return_value.list_notification_channels.return_value = [
notification_channel_enabled,
notification_channel_disabled,
@@ -335,15 +347,13 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
- notification_channel_enabled.enabled.value = False # pylint: disable=no-member
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
- mask.paths.append('enabled') # pylint: disable=no-member
+ notification_channel_enabled.enabled = False # pylint: disable=no-member
+ mask = FieldMask(paths=['enabled'])
mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
- notification_channel=notification_channel_enabled,
- update_mask=mask,
+ request=dict(notification_channel=notification_channel_enabled, update_mask=mask),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -353,12 +363,9 @@ class TestStackdriverHookMethods(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
def test_stackdriver_upsert_channel(self, mock_channel_client, mock_get_creds_and_project_id):
hook = stackdriver.StackdriverHook()
- existing_notification_channel = ParseDict(
- TEST_NOTIFICATION_CHANNEL_1, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
- notification_channel_to_be_created = ParseDict(
- TEST_NOTIFICATION_CHANNEL_2, monitoring_v3.types.notification_pb2.NotificationChannel()
- )
+ existing_notification_channel = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_1)
+ notification_channel_to_be_created = NotificationChannel(**TEST_NOTIFICATION_CHANNEL_2)
+
mock_channel_client.return_value.list_notification_channels.return_value = [
existing_notification_channel
]
@@ -367,24 +374,25 @@ class TestStackdriverHookMethods(unittest.TestCase):
project_id=PROJECT_ID,
)
mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- filter_=None,
- order_by=None,
- page_size=None,
+ request=dict(name=f'projects/{PROJECT_ID}', filter=None, order_by=None, page_size=None),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
mock_channel_client.return_value.update_notification_channel.assert_called_once_with(
- notification_channel=existing_notification_channel, retry=DEFAULT, timeout=DEFAULT, metadata=None
+ request=dict(notification_channel=existing_notification_channel),
+ retry=DEFAULT,
+ timeout=DEFAULT,
+ metadata=(),
)
- notification_channel_to_be_created.ClearField('name')
+ notification_channel_to_be_created.name = None
mock_channel_client.return_value.create_notification_channel.assert_called_once_with(
- name=f'projects/{PROJECT_ID}',
- notification_channel=notification_channel_to_be_created,
+ request=dict(
+ name=f'projects/{PROJECT_ID}', notification_channel=notification_channel_to_be_created
+ ),
retry=DEFAULT,
timeout=DEFAULT,
- metadata=None,
+ metadata=(),
)
@mock.patch(
@@ -400,5 +408,5 @@ class TestStackdriverHookMethods(unittest.TestCase):
name='test-channel',
)
mock_channel_client.return_value.delete_notification_channel.assert_called_once_with(
- name='test-channel', retry=DEFAULT, timeout=DEFAULT, metadata=None
+ request=dict(name='test-channel'), retry=DEFAULT, timeout=DEFAULT, metadata=()
)
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 28901b4..50dd997 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -21,6 +21,7 @@ import unittest
from unittest import mock
from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3 import AlertPolicy, NotificationChannel
from airflow.providers.google.cloud.operators.stackdriver import (
StackdriverDeleteAlertOperator,
@@ -40,16 +41,15 @@ TEST_FILTER = 'filter'
TEST_ALERT_POLICY_1 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/12345",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
- "displayName": "test display",
+ "display_name": "test display",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "aggregations": [{"alignment_eriod": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
},
- "displayName": "Condition display",
+ "display_name": "Condition display",
"name": "projects/sd-project/alertPolicies/123/conditions/456",
}
],
@@ -58,16 +58,15 @@ TEST_ALERT_POLICY_1 = {
TEST_ALERT_POLICY_2 = {
"combiner": "OR",
"name": "projects/sd-project/alertPolicies/6789",
- "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
- "displayName": "test display",
+ "display_name": "test display",
"conditions": [
{
- "conditionThreshold": {
+ "condition_threshold": {
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "aggregations": [{"alignment_period": {'seconds': 60}, "per_series_aligner": "ALIGN_RATE"}],
},
- "displayName": "Condition display",
+ "display_name": "Condition display",
"name": "projects/sd-project/alertPolicies/456/conditions/789",
}
],
@@ -94,7 +93,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
- operator.execute(None)
+ mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
+ result = operator.execute(None)
mock_hook.return_value.list_alert_policies.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
@@ -105,6 +105,16 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
timeout=DEFAULT,
metadata=None,
)
+ assert [
+ {
+ 'combiner': 0,
+ 'conditions': [],
+ 'display_name': '',
+ 'name': 'test-name',
+ 'notification_channels': [],
+ 'user_labels': {},
+ }
+ ] == result
class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
@@ -160,7 +170,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
- operator.execute(None)
+ mock_hook.return_value.list_notification_channels.return_value = [
+ NotificationChannel(name="test-123")
+ ]
+
+ result = operator.execute(None)
mock_hook.return_value.list_notification_channels.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
@@ -171,6 +185,17 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
timeout=DEFAULT,
metadata=None,
)
+ assert [
+ {
+ 'description': '',
+ 'display_name': '',
+ 'labels': {},
+ 'name': 'test-123',
+ 'type_': '',
+ 'user_labels': {},
+ 'verification_status': 0,
+ }
+ ] == result
class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):