You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/13 11:45:44 UTC
[airflow] branch master updated: Add system tests for Stackdriver
operators (#13644)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 189af54 Add system tests for Stackdriver operators (#13644)
189af54 is described below
commit 189af54043a6aa6e7557bda6cf7cfca229d0efd2
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Wed Jan 13 12:45:22 2021 +0100
Add system tests for Stackdriver operators (#13644)
---
.../cloud/example_dags/example_stackdriver.py | 64 +++++++++++++++++-----
.../providers/google/cloud/hooks/stackdriver.py | 5 +-
.../google/cloud/operators/stackdriver.py | 12 ++--
.../google/cloud/hooks/test_stackdriver.py | 50 +++++++++++++++--
.../log/test_stackdriver_task_handler_system.py | 10 ++--
.../google/cloud/operators/test_stackdriver.py | 13 ++++-
.../cloud/operators/test_stackdriver_system.py | 30 ++++++++++
.../google/cloud/utils/gcp_authenticator.py | 2 +-
8 files changed, 154 insertions(+), 32 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_stackdriver.py b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
index 68ac978..a7e797b 100644
--- a/airflow/providers/google/cloud/example_dags/example_stackdriver.py
+++ b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
@@ -21,6 +21,7 @@ Example Airflow DAG for Google Cloud Stackdriver service.
"""
import json
+import os
from airflow import models
from airflow.providers.google.cloud.operators.stackdriver import (
@@ -37,38 +38,66 @@ from airflow.providers.google.cloud.operators.stackdriver import (
)
from airflow.utils.dates import days_ago
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
TEST_ALERT_POLICY_1 = {
"combiner": "OR",
- "name": "projects/sd-project/alertPolicies/12345",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": True,
"displayName": "test alert 1",
"conditions": [
{
"conditionThreshold": {
+ "filter": (
+ 'metric.label.state="blocked" AND '
+ 'metric.type="agent.googleapis.com/processes/count_by_state" '
+ 'AND resource.type="gce_instance"'
+ ),
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "thresholdValue": 100,
+ "duration": "900s",
+ "trigger": {"percent": 0},
+ "aggregations": [
+ {
+ "alignmentPeriod": "60s",
+ "perSeriesAligner": "ALIGN_MEAN",
+ "crossSeriesReducer": "REDUCE_MEAN",
+ "groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
+ }
+ ],
},
- "displayName": "Condition display",
- "name": "projects/sd-project/alertPolicies/123/conditions/456",
+ "displayName": "test_alert_policy_1",
}
],
}
TEST_ALERT_POLICY_2 = {
"combiner": "OR",
- "name": "projects/sd-project/alertPolicies/6789",
"creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
"enabled": False,
"displayName": "test alert 2",
"conditions": [
{
"conditionThreshold": {
+ "filter": (
+ 'metric.label.state="blocked" AND '
+ 'metric.type="agent.googleapis.com/processes/count_by_state" AND '
+ 'resource.type="gce_instance"'
+ ),
"comparison": "COMPARISON_GT",
- "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+ "thresholdValue": 100,
+ "duration": "900s",
+ "trigger": {"percent": 0},
+ "aggregations": [
+ {
+ "alignmentPeriod": "60s",
+ "perSeriesAligner": "ALIGN_MEAN",
+ "crossSeriesReducer": "REDUCE_MEAN",
+ "groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
+ }
+ ],
},
- "displayName": "Condition display",
- "name": "projects/sd-project/alertPolicies/456/conditions/789",
+ "displayName": "test_alert_policy_2",
}
],
}
@@ -77,7 +106,6 @@ TEST_NOTIFICATION_CHANNEL_1 = {
"displayName": "channel1",
"enabled": True,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
- "name": "projects/sd-project/notificationChannels/12345",
"type": "slack",
}
@@ -85,7 +113,6 @@ TEST_NOTIFICATION_CHANNEL_2 = {
"displayName": "channel2",
"enabled": False,
"labels": {"auth_token": "top-secret", "channel_name": "#channel"},
- "name": "projects/sd-project/notificationChannels/6789",
"type": "slack",
}
@@ -150,18 +177,29 @@ with models.DAG(
# [START howto_operator_gcp_stackdriver_delete_notification_channel]
delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
task_id='delete-notification-channel',
- name='test-channel',
+ name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_notification_channel]
+ delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
+ task_id='delete-notification-channel-2',
+ name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
+ )
+
# [START howto_operator_gcp_stackdriver_delete_alert_policy]
delete_alert_policy = StackdriverDeleteAlertOperator(
task_id='delete-alert-policy',
- name='test-alert',
+ name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)
# [END howto_operator_gcp_stackdriver_delete_alert_policy]
+ delete_alert_policy_2 = StackdriverDeleteAlertOperator(
+ task_id='delete-alert-policy-2',
+ name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
+ )
+
create_notification_channel >> enable_notification_channel >> disable_notification_channel
disable_notification_channel >> list_notification_channel >> create_alert_policy
create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
- list_alert_policies >> delete_notification_channel >> delete_alert_policy
+ list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
+ delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py b/airflow/providers/google/cloud/hooks/stackdriver.py
index 9da1afa..e556cd9 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -265,11 +265,10 @@ class StackdriverHook(GoogleBaseHook):
]
policies_ = []
channels = []
-
- for channel in record["channels"]:
+ for channel in record.get("channels", []):
channel_json = json.dumps(channel)
channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
- for policy in record["policies"]:
+ for policy in record.get("policies", []):
policy_json = json.dumps(policy)
policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index dc86466..b159ad7 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -19,6 +19,7 @@
from typing import Optional, Sequence, Union
from google.api_core.gapic_v1.method import DEFAULT
+from google.protobuf.json_format import MessageToDict
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
@@ -125,7 +126,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
def execute(self, context):
self.log.info(
- 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+ 'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
@@ -139,7 +140,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
impersonation_chain=self.impersonation_chain,
)
- return self.hook.list_alert_policies(
+ result = self.hook.list_alert_policies(
project_id=self.project_id,
format_=self.format_,
filter_=self.filter_,
@@ -149,6 +150,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ return [MessageToDict(policy) for policy in result]
class StackdriverEnableAlertPoliciesOperator(BaseOperator):
@@ -614,7 +616,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
def execute(self, context):
self.log.info(
- 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+ 'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
self.project_id,
self.format_,
self.filter_,
@@ -627,7 +629,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- return self.hook.list_notification_channels(
+ channels = self.hook.list_notification_channels(
format_=self.format_,
project_id=self.project_id,
filter_=self.filter_,
@@ -637,6 +639,8 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ result = [MessageToDict(channel) for channel in channels]
+ return result
class StackdriverEnableNotificationChannelsOperator(BaseOperator):
diff --git a/tests/providers/google/cloud/hooks/test_stackdriver.py b/tests/providers/google/cloud/hooks/test_stackdriver.py
index 6892d05..bbef80f 100644
--- a/tests/providers/google/cloud/hooks/test_stackdriver.py
+++ b/tests/providers/google/cloud/hooks/test_stackdriver.py
@@ -132,9 +132,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
page_size=None,
metadata=None,
)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
+ mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
alert_policy_disabled.enabled.value = True # pylint: disable=no-member
- mask.paths.append('enabled') # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
alert_policy=alert_policy_disabled,
update_mask=mask,
@@ -170,9 +169,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
page_size=None,
metadata=None,
)
- mask = monitoring_v3.types.field_mask_pb2.FieldMask()
+ mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
alert_policy_enabled.enabled.value = False # pylint: disable=no-member
- mask.paths.append('enabled') # pylint: disable=no-member
mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
alert_policy=alert_policy_enabled,
update_mask=mask,
@@ -241,6 +239,50 @@ class TestStackdriverHookMethods(unittest.TestCase):
return_value=(CREDENTIALS, PROJECT_ID),
)
@mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
+ @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
+ def test_stackdriver_upsert_alert_policy_without_channel(
+ self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
+ ):
+ hook = stackdriver.StackdriverHook()
+ existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
+
+ mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
+ mock_channel_client.return_value.list_notification_channels.return_value = []
+
+ hook.upsert_alert(
+ alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
+ project_id=PROJECT_ID,
+ )
+ mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
+ name=f'projects/{PROJECT_ID}',
+ filter_=None,
+ retry=DEFAULT,
+ timeout=DEFAULT,
+ order_by=None,
+ page_size=None,
+ metadata=None,
+ )
+ mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
+ name=f'projects/{PROJECT_ID}',
+ filter_=None,
+ retry=DEFAULT,
+ timeout=DEFAULT,
+ order_by=None,
+ page_size=None,
+ metadata=None,
+ )
+
+ existing_alert_policy.ClearField('creation_record')
+ existing_alert_policy.ClearField('mutation_record')
+ mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
+ alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
+ )
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
+ return_value=(CREDENTIALS, PROJECT_ID),
+ )
+ @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
def test_stackdriver_delete_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
hook = stackdriver.StackdriverHook()
hook.delete_alert_policy(
diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
index 3b643ac..6ed607b 100644
--- a/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
@@ -28,14 +28,14 @@ from airflow.example_dags import example_complex
from airflow.models import TaskInstance
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import provide_session
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDDRIVER
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
from tests.test_utils.gcp_system_helpers import provide_gcp_context, resolve_full_gcp_key_path
@pytest.mark.system("google")
-@pytest.mark.credential_file(GCP_STACKDDRIVER)
+@pytest.mark.credential_file(GCP_STACKDRIVER)
class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
def setUp(self) -> None:
clear_db_runs()
@@ -54,7 +54,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
'os.environ',
AIRFLOW__LOGGING__REMOTE_LOGGING="true",
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
- AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
+ AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDRIVER),
AIRFLOW__CORE__LOAD_EXAMPLES="false",
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
):
@@ -72,7 +72,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
AIRFLOW__CORE__LOAD_EXAMPLES="false",
AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
- GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
+ GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDRIVER),
):
self.assertEqual(0, subprocess.Popen(["airflow", "dags", "trigger", "example_complex"]).wait())
self.assertEqual(0, subprocess.Popen(["airflow", "scheduler", "--num-runs", "1"]).wait())
@@ -81,7 +81,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
self.assert_remote_logs("INFO - Task exited with return code 0", ti)
def assert_remote_logs(self, expected_message, ti):
- with provide_gcp_context(GCP_STACKDDRIVER), conf_vars(
+ with provide_gcp_context(GCP_STACKDRIVER), conf_vars(
{
('logging', 'remote_logging'): 'True',
('logging', 'remote_base_log_folder'): f"stackdriver://{self.log_name}",
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 28901b4..fdf28dc 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -21,6 +21,8 @@ import unittest
from unittest import mock
from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3.proto.alert_pb2 import AlertPolicy
+from google.cloud.monitoring_v3.proto.notification_pb2 import NotificationChannel
from airflow.providers.google.cloud.operators.stackdriver import (
StackdriverDeleteAlertOperator,
@@ -94,7 +96,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
- operator.execute(None)
+ mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
+ result = operator.execute(None)
mock_hook.return_value.list_alert_policies.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
@@ -105,6 +108,7 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
timeout=DEFAULT,
metadata=None,
)
+ self.assertEqual([{'name': 'test-name'}], result)
class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
@@ -160,7 +164,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
def test_execute(self, mock_hook):
operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
- operator.execute(None)
+ mock_hook.return_value.list_notification_channels.return_value = [
+ NotificationChannel(name="test-123")
+ ]
+
+ result = operator.execute(None)
mock_hook.return_value.list_notification_channels.assert_called_once_with(
project_id=None,
filter_=TEST_FILTER,
@@ -171,6 +179,7 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
timeout=DEFAULT,
metadata=None,
)
+ self.assertEqual([{'name': 'test-123'}], result)
class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
diff --git a/tests/providers/google/cloud/operators/test_stackdriver_system.py b/tests/providers/google/cloud/operators/test_stackdriver_system.py
new file mode 100644
index 0000000..1d76603
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_stackdriver_system.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_STACKDRIVER)
+class GCPTextToSpeechExampleDagSystemTest(GoogleSystemTest):
+ @provide_gcp_context(GCP_STACKDRIVER)
+ def test_run_example_dag(self):
+ self.run_dag("example_stackdriver", CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index bf36ead..8aa55b8 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -52,7 +52,7 @@ GCP_MEMORYSTORE = 'gcp_memorystore.json'
GCP_PUBSUB_KEY = "gcp_pubsub.json"
GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
GCP_SPANNER_KEY = 'gcp_spanner.json'
-GCP_STACKDDRIVER = 'gcp_stackdriver.json'
+GCP_STACKDRIVER = 'gcp_stackdriver.json'
GCP_TASKS_KEY = 'gcp_tasks.json'
GMP_KEY = 'gmp.json'
G_FIREBASE_KEY = 'g_firebase.json'