You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/13 11:45:44 UTC

[airflow] branch master updated: Add system tests for Stackdriver operators (#13644)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 189af54  Add system tests for Stackdriver operators (#13644)
189af54 is described below

commit 189af54043a6aa6e7557bda6cf7cfca229d0efd2
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Wed Jan 13 12:45:22 2021 +0100

    Add system tests for Stackdriver operators (#13644)
---
 .../cloud/example_dags/example_stackdriver.py      | 64 +++++++++++++++++-----
 .../providers/google/cloud/hooks/stackdriver.py    |  5 +-
 .../google/cloud/operators/stackdriver.py          | 12 ++--
 .../google/cloud/hooks/test_stackdriver.py         | 50 +++++++++++++++--
 .../log/test_stackdriver_task_handler_system.py    | 10 ++--
 .../google/cloud/operators/test_stackdriver.py     | 13 ++++-
 .../cloud/operators/test_stackdriver_system.py     | 30 ++++++++++
 .../google/cloud/utils/gcp_authenticator.py        |  2 +-
 8 files changed, 154 insertions(+), 32 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_stackdriver.py b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
index 68ac978..a7e797b 100644
--- a/airflow/providers/google/cloud/example_dags/example_stackdriver.py
+++ b/airflow/providers/google/cloud/example_dags/example_stackdriver.py
@@ -21,6 +21,7 @@ Example Airflow DAG for Google Cloud Stackdriver service.
 """
 
 import json
+import os
 
 from airflow import models
 from airflow.providers.google.cloud.operators.stackdriver import (
@@ -37,38 +38,66 @@ from airflow.providers.google.cloud.operators.stackdriver import (
 )
 from airflow.utils.dates import days_ago
 
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
 TEST_ALERT_POLICY_1 = {
     "combiner": "OR",
-    "name": "projects/sd-project/alertPolicies/12345",
     "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": True,
     "displayName": "test alert 1",
     "conditions": [
         {
             "conditionThreshold": {
+                "filter": (
+                    'metric.label.state="blocked" AND '
+                    'metric.type="agent.googleapis.com/processes/count_by_state" '
+                    'AND resource.type="gce_instance"'
+                ),
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "thresholdValue": 100,
+                "duration": "900s",
+                "trigger": {"percent": 0},
+                "aggregations": [
+                    {
+                        "alignmentPeriod": "60s",
+                        "perSeriesAligner": "ALIGN_MEAN",
+                        "crossSeriesReducer": "REDUCE_MEAN",
+                        "groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
+                    }
+                ],
             },
-            "displayName": "Condition display",
-            "name": "projects/sd-project/alertPolicies/123/conditions/456",
+            "displayName": "test_alert_policy_1",
         }
     ],
 }
 
 TEST_ALERT_POLICY_2 = {
     "combiner": "OR",
-    "name": "projects/sd-project/alertPolicies/6789",
     "creationRecord": {"mutatedBy": "user123", "mutateTime": "2020-01-01T00:00:00.000000Z"},
     "enabled": False,
     "displayName": "test alert 2",
     "conditions": [
         {
             "conditionThreshold": {
+                "filter": (
+                    'metric.label.state="blocked" AND '
+                    'metric.type="agent.googleapis.com/processes/count_by_state" AND '
+                    'resource.type="gce_instance"'
+                ),
                 "comparison": "COMPARISON_GT",
-                "aggregations": [{"alignmentPeriod": "60s", "perSeriesAligner": "ALIGN_RATE"}],
+                "thresholdValue": 100,
+                "duration": "900s",
+                "trigger": {"percent": 0},
+                "aggregations": [
+                    {
+                        "alignmentPeriod": "60s",
+                        "perSeriesAligner": "ALIGN_MEAN",
+                        "crossSeriesReducer": "REDUCE_MEAN",
+                        "groupByFields": ["project", "resource.label.instance_id", "resource.label.zone"],
+                    }
+                ],
             },
-            "displayName": "Condition display",
-            "name": "projects/sd-project/alertPolicies/456/conditions/789",
+            "displayName": "test_alert_policy_2",
         }
     ],
 }
@@ -77,7 +106,6 @@ TEST_NOTIFICATION_CHANNEL_1 = {
     "displayName": "channel1",
     "enabled": True,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
-    "name": "projects/sd-project/notificationChannels/12345",
     "type": "slack",
 }
 
@@ -85,7 +113,6 @@ TEST_NOTIFICATION_CHANNEL_2 = {
     "displayName": "channel2",
     "enabled": False,
     "labels": {"auth_token": "top-secret", "channel_name": "#channel"},
-    "name": "projects/sd-project/notificationChannels/6789",
     "type": "slack",
 }
 
@@ -150,18 +177,29 @@ with models.DAG(
     # [START howto_operator_gcp_stackdriver_delete_notification_channel]
     delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
         task_id='delete-notification-channel',
-        name='test-channel',
+        name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
     )
     # [END howto_operator_gcp_stackdriver_delete_notification_channel]
 
+    delete_notification_channel_2 = StackdriverDeleteNotificationChannelOperator(
+        task_id='delete-notification-channel-2',
+        name="{{ task_instance.xcom_pull('list-notification-channel')[1]['name'] }}",
+    )
+
     # [START howto_operator_gcp_stackdriver_delete_alert_policy]
     delete_alert_policy = StackdriverDeleteAlertOperator(
         task_id='delete-alert-policy',
-        name='test-alert',
+        name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
     )
     # [END howto_operator_gcp_stackdriver_delete_alert_policy]
 
+    delete_alert_policy_2 = StackdriverDeleteAlertOperator(
+        task_id='delete-alert-policy-2',
+        name="{{ task_instance.xcom_pull('list-alert-policies')[1]['name'] }}",
+    )
+
     create_notification_channel >> enable_notification_channel >> disable_notification_channel
     disable_notification_channel >> list_notification_channel >> create_alert_policy
     create_alert_policy >> enable_alert_policy >> disable_alert_policy >> list_alert_policies
-    list_alert_policies >> delete_notification_channel >> delete_alert_policy
+    list_alert_policies >> delete_notification_channel >> delete_notification_channel_2
+    delete_notification_channel_2 >> delete_alert_policy >> delete_alert_policy_2
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py b/airflow/providers/google/cloud/hooks/stackdriver.py
index 9da1afa..e556cd9 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -265,11 +265,10 @@ class StackdriverHook(GoogleBaseHook):
         ]
         policies_ = []
         channels = []
-
-        for channel in record["channels"]:
+        for channel in record.get("channels", []):
             channel_json = json.dumps(channel)
             channels.append(Parse(channel_json, monitoring_v3.types.notification_pb2.NotificationChannel()))
-        for policy in record["policies"]:
+        for policy in record.get("policies", []):
             policy_json = json.dumps(policy)
             policies_.append(Parse(policy_json, monitoring_v3.types.alert_pb2.AlertPolicy()))
 
diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py
index dc86466..b159ad7 100644
--- a/airflow/providers/google/cloud/operators/stackdriver.py
+++ b/airflow/providers/google/cloud/operators/stackdriver.py
@@ -19,6 +19,7 @@
 from typing import Optional, Sequence, Union
 
 from google.api_core.gapic_v1.method import DEFAULT
+from google.protobuf.json_format import MessageToDict
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.stackdriver import StackdriverHook
@@ -125,7 +126,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
 
     def execute(self, context):
         self.log.info(
-            'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+            'List Alert Policies: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
             self.project_id,
             self.format_,
             self.filter_,
@@ -139,7 +140,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
                 impersonation_chain=self.impersonation_chain,
             )
 
-        return self.hook.list_alert_policies(
+        result = self.hook.list_alert_policies(
             project_id=self.project_id,
             format_=self.format_,
             filter_=self.filter_,
@@ -149,6 +150,7 @@ class StackdriverListAlertPoliciesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        return [MessageToDict(policy) for policy in result]
 
 
 class StackdriverEnableAlertPoliciesOperator(BaseOperator):
@@ -614,7 +616,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
 
     def execute(self, context):
         self.log.info(
-            'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %d',
+            'List Notification Channels: Project id: %s Format: %s Filter: %s Order By: %s Page Size: %s',
             self.project_id,
             self.format_,
             self.filter_,
@@ -627,7 +629,7 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
                 delegate_to=self.delegate_to,
                 impersonation_chain=self.impersonation_chain,
             )
-        return self.hook.list_notification_channels(
+        channels = self.hook.list_notification_channels(
             format_=self.format_,
             project_id=self.project_id,
             filter_=self.filter_,
@@ -637,6 +639,8 @@ class StackdriverListNotificationChannelsOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        result = [MessageToDict(channel) for channel in channels]
+        return result
 
 
 class StackdriverEnableNotificationChannelsOperator(BaseOperator):
diff --git a/tests/providers/google/cloud/hooks/test_stackdriver.py b/tests/providers/google/cloud/hooks/test_stackdriver.py
index 6892d05..bbef80f 100644
--- a/tests/providers/google/cloud/hooks/test_stackdriver.py
+++ b/tests/providers/google/cloud/hooks/test_stackdriver.py
@@ -132,9 +132,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
             page_size=None,
             metadata=None,
         )
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
+        mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
         alert_policy_disabled.enabled.value = True  # pylint: disable=no-member
-        mask.paths.append('enabled')  # pylint: disable=no-member
         mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
             alert_policy=alert_policy_disabled,
             update_mask=mask,
@@ -170,9 +169,8 @@ class TestStackdriverHookMethods(unittest.TestCase):
             page_size=None,
             metadata=None,
         )
-        mask = monitoring_v3.types.field_mask_pb2.FieldMask()
+        mask = monitoring_v3.types.field_mask_pb2.FieldMask(paths=["enabled"])
         alert_policy_enabled.enabled.value = False  # pylint: disable=no-member
-        mask.paths.append('enabled')  # pylint: disable=no-member
         mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
             alert_policy=alert_policy_enabled,
             update_mask=mask,
@@ -241,6 +239,50 @@ class TestStackdriverHookMethods(unittest.TestCase):
         return_value=(CREDENTIALS, PROJECT_ID),
     )
     @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
+    @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_channel_client')
+    def test_stackdriver_upsert_alert_policy_without_channel(
+        self, mock_channel_client, mock_policy_client, mock_get_creds_and_project_id
+    ):
+        hook = stackdriver.StackdriverHook()
+        existing_alert_policy = ParseDict(TEST_ALERT_POLICY_1, monitoring_v3.types.alert_pb2.AlertPolicy())
+
+        mock_policy_client.return_value.list_alert_policies.return_value = [existing_alert_policy]
+        mock_channel_client.return_value.list_notification_channels.return_value = []
+
+        hook.upsert_alert(
+            alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
+            project_id=PROJECT_ID,
+        )
+        mock_channel_client.return_value.list_notification_channels.assert_called_once_with(
+            name=f'projects/{PROJECT_ID}',
+            filter_=None,
+            retry=DEFAULT,
+            timeout=DEFAULT,
+            order_by=None,
+            page_size=None,
+            metadata=None,
+        )
+        mock_policy_client.return_value.list_alert_policies.assert_called_once_with(
+            name=f'projects/{PROJECT_ID}',
+            filter_=None,
+            retry=DEFAULT,
+            timeout=DEFAULT,
+            order_by=None,
+            page_size=None,
+            metadata=None,
+        )
+
+        existing_alert_policy.ClearField('creation_record')
+        existing_alert_policy.ClearField('mutation_record')
+        mock_policy_client.return_value.update_alert_policy.assert_called_once_with(
+            alert_policy=existing_alert_policy, retry=DEFAULT, timeout=DEFAULT, metadata=None
+        )
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook._get_credentials_and_project_id',
+        return_value=(CREDENTIALS, PROJECT_ID),
+    )
+    @mock.patch('airflow.providers.google.cloud.hooks.stackdriver.StackdriverHook._get_policy_client')
     def test_stackdriver_delete_alert_policy(self, mock_policy_client, mock_get_creds_and_project_id):
         hook = stackdriver.StackdriverHook()
         hook.delete_alert_policy(
diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
index 3b643ac..6ed607b 100644
--- a/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler_system.py
@@ -28,14 +28,14 @@ from airflow.example_dags import example_complex
 from airflow.models import TaskInstance
 from airflow.utils.log.log_reader import TaskLogReader
 from airflow.utils.session import provide_session
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDDRIVER
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.gcp_system_helpers import provide_gcp_context, resolve_full_gcp_key_path
 
 
 @pytest.mark.system("google")
-@pytest.mark.credential_file(GCP_STACKDDRIVER)
+@pytest.mark.credential_file(GCP_STACKDRIVER)
 class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
     def setUp(self) -> None:
         clear_db_runs()
@@ -54,7 +54,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
             'os.environ',
             AIRFLOW__LOGGING__REMOTE_LOGGING="true",
             AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
-            AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
+            AIRFLOW__LOGGING__GOOGLE_KEY_PATH=resolve_full_gcp_key_path(GCP_STACKDRIVER),
             AIRFLOW__CORE__LOAD_EXAMPLES="false",
             AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
         ):
@@ -72,7 +72,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
             AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=f"stackdriver://{self.log_name}",
             AIRFLOW__CORE__LOAD_EXAMPLES="false",
             AIRFLOW__CORE__DAGS_FOLDER=example_complex.__file__,
-            GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDDRIVER),
+            GOOGLE_APPLICATION_CREDENTIALS=resolve_full_gcp_key_path(GCP_STACKDRIVER),
         ):
             self.assertEqual(0, subprocess.Popen(["airflow", "dags", "trigger", "example_complex"]).wait())
             self.assertEqual(0, subprocess.Popen(["airflow", "scheduler", "--num-runs", "1"]).wait())
@@ -81,7 +81,7 @@ class TestStackdriverLoggingHandlerSystemTest(unittest.TestCase):
         self.assert_remote_logs("INFO - Task exited with return code 0", ti)
 
     def assert_remote_logs(self, expected_message, ti):
-        with provide_gcp_context(GCP_STACKDDRIVER), conf_vars(
+        with provide_gcp_context(GCP_STACKDRIVER), conf_vars(
             {
                 ('logging', 'remote_logging'): 'True',
                 ('logging', 'remote_base_log_folder'): f"stackdriver://{self.log_name}",
diff --git a/tests/providers/google/cloud/operators/test_stackdriver.py b/tests/providers/google/cloud/operators/test_stackdriver.py
index 28901b4..fdf28dc 100644
--- a/tests/providers/google/cloud/operators/test_stackdriver.py
+++ b/tests/providers/google/cloud/operators/test_stackdriver.py
@@ -21,6 +21,8 @@ import unittest
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.monitoring_v3.proto.alert_pb2 import AlertPolicy
+from google.cloud.monitoring_v3.proto.notification_pb2 import NotificationChannel
 
 from airflow.providers.google.cloud.operators.stackdriver import (
     StackdriverDeleteAlertOperator,
@@ -94,7 +96,8 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
     def test_execute(self, mock_hook):
         operator = StackdriverListAlertPoliciesOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
-        operator.execute(None)
+        mock_hook.return_value.list_alert_policies.return_value = [AlertPolicy(name="test-name")]
+        result = operator.execute(None)
         mock_hook.return_value.list_alert_policies.assert_called_once_with(
             project_id=None,
             filter_=TEST_FILTER,
@@ -105,6 +108,7 @@ class TestStackdriverListAlertPoliciesOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
+        self.assertEqual([{'name': 'test-name'}], result)
 
 
 class TestStackdriverEnableAlertPoliciesOperator(unittest.TestCase):
@@ -160,7 +164,11 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.operators.stackdriver.StackdriverHook')
     def test_execute(self, mock_hook):
         operator = StackdriverListNotificationChannelsOperator(task_id=TEST_TASK_ID, filter_=TEST_FILTER)
-        operator.execute(None)
+        mock_hook.return_value.list_notification_channels.return_value = [
+            NotificationChannel(name="test-123")
+        ]
+
+        result = operator.execute(None)
         mock_hook.return_value.list_notification_channels.assert_called_once_with(
             project_id=None,
             filter_=TEST_FILTER,
@@ -171,6 +179,7 @@ class TestStackdriverListNotificationChannelsOperator(unittest.TestCase):
             timeout=DEFAULT,
             metadata=None,
         )
+        self.assertEqual([{'name': 'test-123'}], result)
 
 
 class TestStackdriverEnableNotificationChannelsOperator(unittest.TestCase):
diff --git a/tests/providers/google/cloud/operators/test_stackdriver_system.py b/tests/providers/google/cloud/operators/test_stackdriver_system.py
new file mode 100644
index 0000000..1d76603
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_stackdriver_system.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_STACKDRIVER
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_STACKDRIVER)
+class GCPTextToSpeechExampleDagSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_STACKDRIVER)
+    def test_run_example_dag(self):
+        self.run_dag("example_stackdriver", CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index bf36ead..8aa55b8 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -52,7 +52,7 @@ GCP_MEMORYSTORE = 'gcp_memorystore.json'
 GCP_PUBSUB_KEY = "gcp_pubsub.json"
 GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
 GCP_SPANNER_KEY = 'gcp_spanner.json'
-GCP_STACKDDRIVER = 'gcp_stackdriver.json'
+GCP_STACKDRIVER = 'gcp_stackdriver.json'
 GCP_TASKS_KEY = 'gcp_tasks.json'
 GMP_KEY = 'gmp.json'
 G_FIREBASE_KEY = 'g_firebase.json'