You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/01 10:10:50 UTC

[airflow] branch main updated: Spanner assets & system tests migration (AIP-47) (#23957)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 841ed27101 Spanner assets & system tests migration (AIP-47) (#23957)
841ed27101 is described below

commit 841ed271017ff35a3124f1d1a53a5c74730fed60
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Wed Jun 1 12:10:45 2022 +0200

    Spanner assets & system tests migration (AIP-47) (#23957)
---
 .../google/cloud/example_dags/example_spanner.sql  | 23 ------
 airflow/providers/google/cloud/links/spanner.py    | 74 +++++++++++++++++
 .../providers/google/cloud/operators/spanner.py    | 32 ++++++++
 airflow/providers/google/provider.yaml             |  2 +
 .../operators/cloud/spanner.rst                    | 14 ++--
 .../google/cloud/operators/test_spanner.py         | 42 ++++++----
 .../google/cloud/operators/test_spanner_system.py  | 55 -------------
 .../providers/google/spanner}/example_spanner.py   | 95 ++++++++--------------
 8 files changed, 179 insertions(+), 158 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_spanner.sql b/airflow/providers/google/cloud/example_dags/example_spanner.sql
deleted file mode 100644
index 54854a1d15..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_spanner.sql
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
-*/
-
-
-INSERT my_table2 (id, name) VALUES (7, 'Seven');
-INSERT my_table2 (id, name)
-    VALUES (8, 'Eight');
diff --git a/airflow/providers/google/cloud/links/spanner.py b/airflow/providers/google/cloud/links/spanner.py
new file mode 100644
index 0000000000..0834944dc1
--- /dev/null
+++ b/airflow/providers/google/cloud/links/spanner.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Spanner links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+SPANNER_BASE_LINK = "https://console.cloud.google.com/spanner/instances"
+SPANNER_INSTANCE_LINK = SPANNER_BASE_LINK + "/{instance_id}/details/databases?project={project_id}"
+SPANNER_DATABASE_LINK = (
+    SPANNER_BASE_LINK + "/{instance_id}/databases/{database_id}/details/tables?project={project_id}"
+)
+
+
+class SpannerInstanceLink(BaseGoogleLink):
+    """Helper class for constructing Spanner Instance Link"""
+
+    name = "Spanner Instance"
+    key = "spanner_instance"
+    format_str = SPANNER_INSTANCE_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        instance_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=SpannerInstanceLink.key,
+            value={"instance_id": instance_id, "project_id": project_id},
+        )
+
+
+class SpannerDatabaseLink(BaseGoogleLink):
+    """Helper class for constructing Spanner Database Link"""
+
+    name = "Spanner Database"
+    key = "spanner_database"
+    format_str = SPANNER_DATABASE_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        instance_id: str,
+        database_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=SpannerDatabaseLink.key,
+            value={"instance_id": instance_id, "database_id": database_id, "project_id": project_id},
+        )
diff --git a/airflow/providers/google/cloud/operators/spanner.py b/airflow/providers/google/cloud/operators/spanner.py
index 6ac9a9e682..0780621e29 100644
--- a/airflow/providers/google/cloud/operators/spanner.py
+++ b/airflow/providers/google/cloud/operators/spanner.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, List, Optional, Sequence, Union
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.spanner import SpannerHook
+from airflow.providers.google.cloud.links.spanner import SpannerDatabaseLink, SpannerInstanceLink
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -67,6 +68,7 @@ class SpannerDeployInstanceOperator(BaseOperator):
         'impersonation_chain',
     )
     # [END gcp_spanner_deploy_template_fields]
+    operator_extra_links = (SpannerInstanceLink(),)
 
     def __init__(
         self,
@@ -114,6 +116,12 @@ class SpannerDeployInstanceOperator(BaseOperator):
             node_count=self.node_count,
             display_name=self.display_name,
         )
+        SpannerInstanceLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class SpannerDeleteInstanceOperator(BaseOperator):
@@ -223,6 +231,7 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
     template_ext: Sequence[str] = ('.sql',)
     template_fields_renderers = {'query': 'sql'}
     # [END gcp_spanner_query_template_fields]
+    operator_extra_links = (SpannerDatabaseLink(),)
 
     def __init__(
         self,
@@ -277,6 +286,13 @@ class SpannerQueryDatabaseInstanceOperator(BaseOperator):
             database_id=self.database_id,
             queries=queries,
         )
+        SpannerDatabaseLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            database_id=self.database_id,
+            project_id=self.project_id or hook.project_id,
+        )
 
     @staticmethod
     def sanitize_queries(queries: List[str]) -> None:
@@ -327,6 +343,7 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
     template_ext: Sequence[str] = ('.sql',)
     template_fields_renderers = {'ddl_statements': 'sql'}
     # [END gcp_spanner_database_deploy_template_fields]
+    operator_extra_links = (SpannerDatabaseLink(),)
 
     def __init__(
         self,
@@ -361,6 +378,13 @@ class SpannerDeployDatabaseInstanceOperator(BaseOperator):
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
+        SpannerDatabaseLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            database_id=self.database_id,
+            project_id=self.project_id or hook.project_id,
+        )
         if not hook.get_database(
             project_id=self.project_id, instance_id=self.instance_id, database_id=self.database_id
         ):
@@ -425,6 +449,7 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
     template_ext: Sequence[str] = ('.sql',)
     template_fields_renderers = {'ddl_statements': 'sql'}
     # [END gcp_spanner_database_update_template_fields]
+    operator_extra_links = (SpannerDatabaseLink(),)
 
     def __init__(
         self,
@@ -472,6 +497,13 @@ class SpannerUpdateDatabaseInstanceOperator(BaseOperator):
                 f"Create the database first before you can update it."
             )
         else:
+            SpannerDatabaseLink.persist(
+                context=context,
+                task_instance=self,
+                instance_id=self.instance_id,
+                database_id=self.database_id,
+                project_id=self.project_id or hook.project_id,
+            )
             return hook.update_database(
                 project_id=self.project_id,
                 instance_id=self.instance_id,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index a74ca0907b..200d1929de 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -916,6 +916,8 @@ extra-links:
   - airflow.providers.google.cloud.links.bigtable.BigtableInstanceLink
   - airflow.providers.google.cloud.links.bigtable.BigtableClusterLink
   - airflow.providers.google.cloud.links.bigtable.BigtableTablesLink
+  - airflow.providers.google.cloud.links.spanner.SpannerDatabaseLink
+  - airflow.providers.google.cloud.links.spanner.SpannerInstanceLink
   - airflow.providers.google.cloud.links.stackdriver.StackdriverNotificationsLink
   - airflow.providers.google.cloud.links.stackdriver.StackdriverPoliciesLink
   - airflow.providers.google.common.links.storage.StorageLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/spanner.rst b/docs/apache-airflow-providers-google/operators/cloud/spanner.rst
index a6a3e8d846..ff4b0dda8e 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/spanner.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/spanner.rst
@@ -41,7 +41,7 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_deploy]
@@ -80,7 +80,7 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_database_delete]
@@ -120,7 +120,7 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_database_deploy]
@@ -164,13 +164,13 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_database_update]
     :end-before: [END howto_operator_spanner_database_update]
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_database_update_idempotent]
@@ -207,7 +207,7 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_query]
@@ -246,7 +246,7 @@ Using the operator
 You can create the operator with or without project id. If project id is missing
 it will be retrieved from the Google Cloud connection used. Both variants are shown:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_spanner.py
+.. exampleinclude:: /../../tests/system/providers/google/spanner/example_spanner.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_spanner_delete]
diff --git a/tests/providers/google/cloud/operators/test_spanner.py b/tests/providers/google/cloud/operators/test_spanner.py
index 57829ae172..01c9f98f40 100644
--- a/tests/providers/google/cloud/operators/test_spanner.py
+++ b/tests/providers/google/cloud/operators/test_spanner.py
@@ -56,7 +56,8 @@ class TestCloudSpanner(unittest.TestCase):
             display_name=DISPLAY_NAME,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -81,7 +82,8 @@ class TestCloudSpanner(unittest.TestCase):
             display_name=DISPLAY_NAME,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -107,7 +109,8 @@ class TestCloudSpanner(unittest.TestCase):
             display_name=DISPLAY_NAME,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -132,7 +135,8 @@ class TestCloudSpanner(unittest.TestCase):
             display_name=DISPLAY_NAME,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -158,7 +162,8 @@ class TestCloudSpanner(unittest.TestCase):
             display_name=DISPLAY_NAME,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -251,7 +256,8 @@ class TestCloudSpanner(unittest.TestCase):
             query=INSERT_QUERY,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -267,7 +273,8 @@ class TestCloudSpanner(unittest.TestCase):
         op = SpannerQueryDatabaseInstanceOperator(
             instance_id=INSTANCE_ID, database_id=DB_ID, query=INSERT_QUERY, task_id="id"
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -311,7 +318,8 @@ class TestCloudSpanner(unittest.TestCase):
             query=INSERT_QUERY,
             task_id="id",
         )
-        op.execute(None)
+        context = mock.MagicMock()
+        op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -330,7 +338,8 @@ class TestCloudSpanner(unittest.TestCase):
             query=[INSERT_QUERY, INSERT_QUERY_2],
             task_id="id",
         )
-        op.execute(None)
+        context = mock.MagicMock()
+        op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -352,7 +361,8 @@ class TestCloudSpanner(unittest.TestCase):
             ddl_statements=DDL_STATEMENTS,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -369,7 +379,8 @@ class TestCloudSpanner(unittest.TestCase):
         op = SpannerDeployDatabaseInstanceOperator(
             instance_id=INSTANCE_ID, database_id=DB_ID, ddl_statements=DDL_STATEMENTS, task_id="id"
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -390,7 +401,8 @@ class TestCloudSpanner(unittest.TestCase):
             ddl_statements=DDL_STATEMENTS,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -432,7 +444,8 @@ class TestCloudSpanner(unittest.TestCase):
             ddl_statements=DDL_STATEMENTS,
             task_id="id",
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
@@ -452,7 +465,8 @@ class TestCloudSpanner(unittest.TestCase):
         op = SpannerUpdateDatabaseInstanceOperator(
             instance_id=INSTANCE_ID, database_id=DB_ID, ddl_statements=DDL_STATEMENTS, task_id="id"
         )
-        result = op.execute(None)
+        context = mock.MagicMock()
+        result = op.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id="google_cloud_default",
             impersonation_chain=None,
diff --git a/tests/providers/google/cloud/operators/test_spanner_system.py b/tests/providers/google/cloud/operators/test_spanner_system.py
deleted file mode 100644
index 3188963d16..0000000000
--- a/tests/providers/google/cloud/operators/test_spanner_system.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_spanner import (
-    GCP_PROJECT_ID,
-    GCP_SPANNER_INSTANCE_ID,
-)
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_SPANNER_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_SPANNER_KEY)
-class CloudSpannerExampleDagsTest(GoogleSystemTest):
-    def setUp(self):
-        super().setUp()
-
-    @provide_gcp_context(GCP_SPANNER_KEY)
-    def tearDown(self):
-        self.execute_with_ctx(
-            [
-                'gcloud',
-                'spanner',
-                '--project',
-                GCP_PROJECT_ID,
-                '--quiet',
-                '--verbosity=none',
-                'instances',
-                'delete',
-                GCP_SPANNER_INSTANCE_ID,
-            ],
-            key=GCP_SPANNER_KEY,
-        )
-        super().tearDown()
-
-    @provide_gcp_context(GCP_SPANNER_KEY)
-    def test_run_example_dag_spanner(self):
-        self.run_dag('example_gcp_spanner', CLOUD_DAG_FOLDER)
diff --git a/airflow/providers/google/cloud/example_dags/example_spanner.py b/tests/system/providers/google/spanner/example_spanner.py
similarity index 63%
rename from airflow/providers/google/cloud/example_dags/example_spanner.py
rename to tests/system/providers/google/spanner/example_spanner.py
index 629941bd09..ccea398017 100644
--- a/airflow/providers/google/cloud/example_dags/example_spanner.py
+++ b/tests/system/providers/google/spanner/example_spanner.py
@@ -18,18 +18,6 @@
 
 """
 Example Airflow DAG that creates, updates, queries and deletes a Cloud Spanner instance.
-
-This DAG relies on the following environment variables
-* GCP_PROJECT_ID - Google Cloud project for the Cloud Spanner instance.
-* GCP_SPANNER_INSTANCE_ID - Cloud Spanner instance ID.
-* GCP_SPANNER_DATABASE_ID - Cloud Spanner database ID.
-* GCP_SPANNER_CONFIG_NAME - The name of the instance's configuration. Values are of the
-  form ``projects/<gcp_project>/instanceConfigs/<configuration>``. See also:
-  https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig
-  https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs
-* GCP_SPANNER_NODE_COUNT - Number of nodes allocated to the instance.
-* GCP_SPANNER_DISPLAY_NAME - The descriptive name for this instance as it appears in UIs.
-  Must be unique per project and between 4 and 30 characters in length.
 """
 
 import os
@@ -44,39 +32,43 @@ from airflow.providers.google.cloud.operators.spanner import (
     SpannerQueryDatabaseInstanceOperator,
     SpannerUpdateDatabaseInstanceOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
 
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
-GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
-GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
-GCP_SPANNER_CONFIG_NAME = os.environ.get(
-    'GCP_SPANNER_CONFIG_NAME', f'projects/{GCP_PROJECT_ID}/instanceConfigs/regional-europe-west3'
-)
-GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
-GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "spanner"
+
+GCP_SPANNER_INSTANCE_ID = f"instance-{DAG_ID}-{ENV_ID}"
+GCP_SPANNER_DATABASE_ID = f"database-{DAG_ID}-{ENV_ID}"
+GCP_SPANNER_CONFIG_NAME = f"projects/{PROJECT_ID}/instanceConfigs/regional-europe-west3"
+GCP_SPANNER_NODE_COUNT = 1
+GCP_SPANNER_DISPLAY_NAME = "InstanceSpanner"
 # OPERATION_ID should be unique per operation
-OPERATION_ID = 'unique_operation_id'
+OPERATION_ID = "unique_operation_id"
+
 
 with models.DAG(
-    'example_gcp_spanner',
+    DAG_ID,
     schedule_interval='@once',  # Override to match your needs
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example'],
+    tags=['example', 'spanner'],
 ) as dag:
     # Create
     # [START howto_operator_spanner_deploy]
     spanner_instance_create_task = SpannerDeployInstanceOperator(
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         configuration_name=GCP_SPANNER_CONFIG_NAME,
-        node_count=int(GCP_SPANNER_NODE_COUNT),
+        node_count=GCP_SPANNER_NODE_COUNT,
         display_name=GCP_SPANNER_DISPLAY_NAME,
         task_id='spanner_instance_create_task',
     )
     spanner_instance_update_task = SpannerDeployInstanceOperator(
         instance_id=GCP_SPANNER_INSTANCE_ID,
         configuration_name=GCP_SPANNER_CONFIG_NAME,
-        node_count=int(GCP_SPANNER_NODE_COUNT) + 1,
+        node_count=GCP_SPANNER_NODE_COUNT + 1,
         display_name=GCP_SPANNER_DISPLAY_NAME + '_updated',
         task_id='spanner_instance_update_task',
     )
@@ -84,7 +76,6 @@ with models.DAG(
 
     # [START howto_operator_spanner_database_deploy]
     spanner_database_deploy_task = SpannerDeployDatabaseInstanceOperator(
-        project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id=GCP_SPANNER_DATABASE_ID,
         ddl_statements=[
@@ -93,20 +84,10 @@ with models.DAG(
         ],
         task_id='spanner_database_deploy_task',
     )
-    spanner_database_deploy_task2 = SpannerDeployDatabaseInstanceOperator(
-        instance_id=GCP_SPANNER_INSTANCE_ID,
-        database_id=GCP_SPANNER_DATABASE_ID,
-        ddl_statements=[
-            "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
-            "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
-        ],
-        task_id='spanner_database_deploy_task2',
-    )
     # [END howto_operator_spanner_database_deploy]
 
     # [START howto_operator_spanner_database_update]
     spanner_database_update_task = SpannerUpdateDatabaseInstanceOperator(
-        project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id=GCP_SPANNER_DATABASE_ID,
         ddl_statements=[
@@ -118,7 +99,7 @@ with models.DAG(
 
     # [START howto_operator_spanner_database_update_idempotent]
     spanner_database_update_idempotent1_task = SpannerUpdateDatabaseInstanceOperator(
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id=GCP_SPANNER_DATABASE_ID,
         operation_id=OPERATION_ID,
@@ -140,55 +121,51 @@ with models.DAG(
 
     # [START howto_operator_spanner_query]
     spanner_instance_query_task = SpannerQueryDatabaseInstanceOperator(
-        project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id=GCP_SPANNER_DATABASE_ID,
         query=["DELETE FROM my_table2 WHERE true"],
         task_id='spanner_instance_query_task',
     )
-    spanner_instance_query_task2 = SpannerQueryDatabaseInstanceOperator(
-        instance_id=GCP_SPANNER_INSTANCE_ID,
-        database_id=GCP_SPANNER_DATABASE_ID,
-        query=["DELETE FROM my_table2 WHERE true"],
-        task_id='spanner_instance_query_task2',
-    )
     # [END howto_operator_spanner_query]
 
     # [START howto_operator_spanner_database_delete]
     spanner_database_delete_task = SpannerDeleteDatabaseInstanceOperator(
-        project_id=GCP_PROJECT_ID,
         instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id=GCP_SPANNER_DATABASE_ID,
         task_id='spanner_database_delete_task',
     )
-    spanner_database_delete_task2 = SpannerDeleteDatabaseInstanceOperator(
-        instance_id=GCP_SPANNER_INSTANCE_ID,
-        database_id=GCP_SPANNER_DATABASE_ID,
-        task_id='spanner_database_delete_task2',
-    )
     # [END howto_operator_spanner_database_delete]
+    spanner_database_delete_task.trigger_rule = TriggerRule.ALL_DONE
 
     # [START howto_operator_spanner_delete]
     spanner_instance_delete_task = SpannerDeleteInstanceOperator(
-        project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task'
-    )
-    spanner_instance_delete_task2 = SpannerDeleteInstanceOperator(
-        instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task2'
+        instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task'
     )
     # [END howto_operator_spanner_delete]
+    spanner_instance_delete_task.trigger_rule = TriggerRule.ALL_DONE
 
     (
         spanner_instance_create_task
         >> spanner_instance_update_task
         >> spanner_database_deploy_task
-        >> spanner_database_deploy_task2
         >> spanner_database_update_task
         >> spanner_database_update_idempotent1_task
         >> spanner_database_update_idempotent2_task
         >> spanner_instance_query_task
-        >> spanner_instance_query_task2
         >> spanner_database_delete_task
-        >> spanner_database_delete_task2
         >> spanner_instance_delete_task
-        >> spanner_instance_delete_task2
     )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)