You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/28 20:06:03 UTC

[airflow] branch main updated: Memorystore assets & system tests migration (AIP-47) (#25361)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f6b48ac6df Memorystore assets & system tests migration (AIP-47) (#25361)
f6b48ac6df is described below

commit f6b48ac6dfaf931a5433ec16369302f68f038c65
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Thu Jul 28 22:05:54 2022 +0200

    Memorystore assets & system tests migration (AIP-47) (#25361)
---
 .../google/cloud/hooks/cloud_memorystore.py        |   6 +-
 .../google/cloud/links/cloud_memorystore.py        | 119 +++++++++++
 .../google/cloud/operators/cloud_memorystore.py    | 134 ++++++++++++-
 airflow/providers/google/provider.yaml             |   4 +
 .../operators/cloud/cloud_memorystore.rst          |  26 +--
 .../cloud/cloud_memorystore_memcached.rst          |  14 +-
 .../cloud/operators/test_cloud_memorystore.py      |  16 ++
 .../operators/test_cloud_memorystore_system.py     |  51 -----
 .../google/cloud/cloud_memorystore/__init__.py     |  16 ++
 .../example_cloud_memorystore_memcached.py         | 146 ++++++++++++++
 .../example_cloud_memorystore_redis.py             | 220 ++++++++-------------
 11 files changed, 533 insertions(+), 219 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 0abb308dcf..586b8d3d2f 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -478,8 +478,9 @@ class CloudMemorystoreHook(GoogleBaseHook):
             timeout=timeout,
             metadata=metadata,
         )
-        result.result()
+        updated_instance = result.result()
         self.log.info("Instance updated: %s", instance.name)
+        return updated_instance
 
 
 class CloudMemorystoreMemcachedHook(GoogleBaseHook):
@@ -833,8 +834,9 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
         result = client.update_instance(
             update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata or ()
         )
-        result.result()
+        updated_instance = result.result()
         self.log.info("Instance updated: %s", instance.name)
+        return updated_instance
 
     @GoogleBaseHook.fallback_to_default_project_id
     def update_parameters(
diff --git a/airflow/providers/google/cloud/links/cloud_memorystore.py b/airflow/providers/google/cloud/links/cloud_memorystore.py
new file mode 100644
index 0000000000..973ef54ae3
--- /dev/null
+++ b/airflow/providers/google/cloud/links/cloud_memorystore.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Cloud Memorystore links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com/memorystore"
+MEMCACHED_LINK = (
+    BASE_LINK + "/memcached/locations/{location_id}/instances/{instance_id}/details?project={project_id}"
+)
+MEMCACHED_LIST_LINK = BASE_LINK + "/memcached/instances?project={project_id}"
+REDIS_LINK = (
+    BASE_LINK + "/redis/locations/{location_id}/instances/{instance_id}/details/overview?project={project_id}"
+)
+REDIS_LIST_LINK = BASE_LINK + "/redis/instances?project={project_id}"
+
+
+class MemcachedInstanceDetailsLink(BaseGoogleLink):
+    """Helper class for constructing Memorystore Memcached Instance Link"""
+
+    name = "Memorystore Memcached Instance"
+    key = "memcached_instance"
+    format_str = MEMCACHED_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        instance_id: str,
+        location_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=MemcachedInstanceDetailsLink.key,
+            value={"instance_id": instance_id, "location_id": location_id, "project_id": project_id},
+        )
+
+
+class MemcachedInstanceListLink(BaseGoogleLink):
+    """Helper class for constructing Memorystore Memcached List of Instances Link"""
+
+    name = "Memorystore Memcached List of Instances"
+    key = "memcached_instances"
+    format_str = MEMCACHED_LIST_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=MemcachedInstanceListLink.key,
+            value={"project_id": project_id},
+        )
+
+
+class RedisInstanceDetailsLink(BaseGoogleLink):
+    """Helper class for constructing Memorystore Redis Instance Link"""
+
+    name = "Memorystore Redis Instance"
+    key = "redis_instance"
+    format_str = REDIS_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        instance_id: str,
+        location_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=RedisInstanceDetailsLink.key,
+            value={"instance_id": instance_id, "location_id": location_id, "project_id": project_id},
+        )
+
+
+class RedisInstanceListLink(BaseGoogleLink):
+    """Helper class for constructing Memorystore Redis List of Instances Link"""
+
+    name = "Memorystore Redis List of Instances"
+    key = "redis_instances"
+    format_str = REDIS_LIST_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=RedisInstanceListLink.key,
+            value={"project_id": project_id},
+        )
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index cbb4ccd025..3ec336ff17 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -36,6 +36,12 @@ from airflow.providers.google.cloud.hooks.cloud_memorystore import (
     CloudMemorystoreHook,
     CloudMemorystoreMemcachedHook,
 )
+from airflow.providers.google.cloud.links.cloud_memorystore import (
+    MemcachedInstanceDetailsLink,
+    MemcachedInstanceListLink,
+    RedisInstanceDetailsLink,
+    RedisInstanceListLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -94,6 +100,7 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -133,6 +140,13 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return Instance.to_dict(result)
 
 
@@ -257,6 +271,7 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -297,6 +312,13 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
@@ -341,6 +363,7 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -380,6 +403,13 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreGetInstanceOperator(BaseOperator):
@@ -420,6 +450,7 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -456,6 +487,13 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return Instance.to_dict(result)
 
 
@@ -505,6 +543,7 @@ class CloudMemorystoreImportOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -544,6 +583,13 @@ class CloudMemorystoreImportOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreListInstancesOperator(BaseOperator):
@@ -588,6 +634,7 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceListLink(),)
 
     def __init__(
         self,
@@ -624,6 +671,11 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceListLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=self.project_id or hook.project_id,
+        )
         instances = [Instance.to_dict(a) for a in result]
         return instances
 
@@ -683,6 +735,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -715,7 +768,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
         hook = CloudMemorystoreHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_instance(
+        res = hook.update_instance(
             update_mask=self.update_mask,
             instance=self.instance,
             location=self.location,
@@ -725,6 +778,15 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+        location_id, instance_id = res.name.split("/")[-3::2]
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id or instance_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreScaleInstanceOperator(BaseOperator):
@@ -767,6 +829,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -798,7 +861,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
 
-        hook.update_instance(
+        res = hook.update_instance(
             update_mask={"paths": ["memory_size_gb"]},
             instance={"memory_size_gb": self.memory_size_gb},
             location=self.location,
@@ -808,6 +871,15 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+        location_id, instance_id = res.name.split("/")[-3::2]
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id or instance_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
@@ -869,6 +941,7 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (RedisInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -921,6 +994,13 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        RedisInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
@@ -1055,6 +1135,7 @@ class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (MemcachedInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -1097,6 +1178,13 @@ class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        MemcachedInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            location_id=self.location,
+            project_id=self.project_id,
+        )
 
 
 class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
@@ -1143,6 +1231,7 @@ class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
         "metadata",
         "gcp_conn_id",
     )
+    operator_extra_links = (MemcachedInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -1178,6 +1267,13 @@ class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        MemcachedInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return cloud_memcache.Instance.to_dict(result)
 
 
@@ -1282,6 +1378,7 @@ class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (MemcachedInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -1318,6 +1415,13 @@ class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        MemcachedInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return cloud_memcache.Instance.to_dict(result)
 
 
@@ -1360,6 +1464,7 @@ class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (MemcachedInstanceListLink(),)
 
     def __init__(
         self,
@@ -1393,6 +1498,11 @@ class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        MemcachedInstanceListLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=self.project_id or hook.project_id,
+        )
         instances = [cloud_memcache.Instance.to_dict(a) for a in result]
         return instances
 
@@ -1449,6 +1559,7 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (MemcachedInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -1481,7 +1592,7 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
         hook = CloudMemorystoreMemcachedHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_instance(
+        res = hook.update_instance(
             update_mask=self.update_mask,
             instance=self.instance,
             location=self.location,
@@ -1491,6 +1602,15 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+        location_id, instance_id = res.name.split("/")[-3::2]
+        MemcachedInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id or instance_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
@@ -1532,6 +1652,7 @@ class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (MemcachedInstanceDetailsLink(),)
 
     def __init__(
         self,
@@ -1574,3 +1695,10 @@ class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        MemcachedInstanceDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            instance_id=self.instance_id,
+            location_id=self.location,
+            project_id=self.project_id,
+        )
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index dc50c3c94b..1533cf4612 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -980,6 +980,10 @@ extra-links:
   - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
   - airflow.providers.google.cloud.links.pubsub.PubSubSubscriptionLink
   - airflow.providers.google.cloud.links.pubsub.PubSubTopicLink
+  - airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceDetailsLink
+  - airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceListLink
+  - airflow.providers.google.cloud.links.cloud_memorystore.RedisInstanceDetailsLink
+  - airflow.providers.google.cloud.links.cloud_memorystore.RedisInstanceListLink
   - airflow.providers.google.common.links.storage.StorageLink
   - airflow.providers.google.common.links.storage.FileDetailsLink
 
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
index dfddcb49c1..2ac632a7a6 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
@@ -41,7 +41,7 @@ presented as a compatible dictionary also.
 
 Here is an example of instance
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :start-after: [START howto_operator_instance]
     :end-before: [END howto_operator_instance]
@@ -59,7 +59,7 @@ make a use of the service account listed under ``persistenceIamIdentity``.
 You can use :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`
 operator to set permissions.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_set_acl_permission]
@@ -76,7 +76,7 @@ Create instance
 Create a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreCreateInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_create_instance]
@@ -87,7 +87,7 @@ You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`, which allows it
 to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_create_instance_result]
@@ -102,7 +102,7 @@ Delete instance
 Delete a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreDeleteInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_delete_instance]
@@ -120,7 +120,7 @@ Export instance
 Delete a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreExportInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_export_instance]
@@ -138,7 +138,7 @@ Failover instance
 Delete a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreFailoverInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_failover_instance]
@@ -156,7 +156,7 @@ Get instance
 Delete a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreGetInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_get_instance]
@@ -174,7 +174,7 @@ Import instance
 Delete a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreImportOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_import_instance]
@@ -192,7 +192,7 @@ List instances
 List a instances is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreListInstancesOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_list_instances]
@@ -203,7 +203,7 @@ You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`, which allows it
 to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_list_instances_result]
@@ -217,7 +217,7 @@ Update instance
 Update a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreUpdateInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_update_instance]
@@ -236,7 +236,7 @@ Scale instance
 Scale a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreScaleInstanceOperator` operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_scale_instance]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
index a3b6f50763..2fe980be02 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
@@ -41,7 +41,7 @@ The object can be presented as a compatible dictionary also.
 
 Here is an example of instance
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :start-after: [START howto_operator_memcached_instance]
     :end-before: [END howto_operator_memcached_instance]
@@ -56,7 +56,7 @@ Create a instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedCreateInstanceOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_create_instance_memcached]
@@ -72,7 +72,7 @@ Delete an instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedDeleteInstanceOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_delete_instance_memcached]
@@ -88,7 +88,7 @@ Get an instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedGetInstanceOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_get_instance_memcached]
@@ -104,7 +104,7 @@ List instances is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedListInstancesOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_list_instances_memcached]
@@ -120,7 +120,7 @@ Updating an instance is performed with the
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateInstanceOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_update_instance_memcached]
@@ -138,7 +138,7 @@ and
 :class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedApplyParametersOperator`
 operator.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_update_and_apply_parameters_memcached]
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
index 7a514e7fc3..2c4e3c85bc 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
@@ -60,6 +60,7 @@ TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]}  # TODO: Fill missing value
 TEST_UPDATE_MASK_MEMCACHED = {"displayName": "memcached instance"}
 TEST_PARENT = "test-parent"
 TEST_NAME = "test-name"
+TEST_UPDATE_INSTANCE_NAME = "projects/{project_id}/locations/{location}/instances/{instance_id}"
 
 
 class TestCloudMemorystoreCreateInstanceOperator(TestCase):
@@ -278,6 +279,11 @@ class TestCloudMemorystoreListInstancesOperator(TestCase):
 class TestCloudMemorystoreUpdateInstanceOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreHook")
     def test_assert_valid_hook_call(self, mock_hook):
+        mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+        )
         task = CloudMemorystoreUpdateInstanceOperator(
             task_id=TEST_TASK_ID,
             update_mask=TEST_UPDATE_MASK,
@@ -311,6 +317,11 @@ class TestCloudMemorystoreUpdateInstanceOperator(TestCase):
 class TestCloudMemorystoreScaleInstanceOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreHook")
     def test_assert_valid_hook_call(self, mock_hook):
+        mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+        )
         task = CloudMemorystoreScaleInstanceOperator(
             task_id=TEST_TASK_ID,
             memory_size_gb=TEST_INSTANCE_SIZE,
@@ -498,6 +509,11 @@ class TestCloudMemorystoreMemcachedListInstancesOperator(TestCase):
 class TestCloudMemorystoreMemcachedUpdateInstanceOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
     def test_assert_valid_hook_call(self, mock_hook):
+        mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+        )
         task = CloudMemorystoreMemcachedUpdateInstanceOperator(
             task_id=TEST_TASK_ID,
             update_mask=TEST_UPDATE_MASK_MEMCACHED,
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
deleted file mode 100644
index 6a00e23c90..0000000000
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""System tests for Google Cloud Memorystore operators"""
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_cloud_memorystore import BUCKET_NAME
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_MEMORYSTORE
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_MEMORYSTORE)
-class CloudMemorystoreSystemTest(GoogleSystemTest):
-    """
-    System tests for Google Cloud Memorystore operators
-
-    It use a real service.
-    """
-
-    @provide_gcp_context(GCP_MEMORYSTORE)
-    def setUp(self):
-        super().setUp()
-        self.create_gcs_bucket(BUCKET_NAME, location="europe-north1")
-
-    @provide_gcp_context(GCP_MEMORYSTORE)
-    def test_run_example_dag_memorystore_redis(self):
-        self.run_dag('gcp_cloud_memorystore_redis', CLOUD_DAG_FOLDER)
-
-    @provide_gcp_context(GCP_MEMORYSTORE)
-    def test_run_example_dag_memorystore_memcached(self):
-        self.run_dag('gcp_cloud_memorystore_memcached', CLOUD_DAG_FOLDER)
-
-    @provide_gcp_context(GCP_MEMORYSTORE)
-    def tearDown(self):
-        self.delete_gcs_bucket(BUCKET_NAME)
-        super().tearDown()
diff --git a/tests/system/providers/google/cloud/cloud_memorystore/__init__.py b/tests/system/providers/google/cloud/cloud_memorystore/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/cloud_memorystore/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
new file mode 100644
index 0000000000..95d844b7c2
--- /dev/null
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google Cloud Memorystore Memcached service.
+"""
+import os
+from datetime import datetime
+
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.providers.google.cloud.operators.cloud_memorystore import (
+    CloudMemorystoreMemcachedApplyParametersOperator,
+    CloudMemorystoreMemcachedCreateInstanceOperator,
+    CloudMemorystoreMemcachedDeleteInstanceOperator,
+    CloudMemorystoreMemcachedGetInstanceOperator,
+    CloudMemorystoreMemcachedListInstancesOperator,
+    CloudMemorystoreMemcachedUpdateInstanceOperator,
+    CloudMemorystoreMemcachedUpdateParametersOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "google_project_id")
+
+DAG_ID = "cloud_memorystore_memcached"
+
+MEMORYSTORE_MEMCACHED_INSTANCE_NAME = f"{ENV_ID}-memcached-1"
+LOCATION = "europe-north1"
+
+# [START howto_operator_memcached_instance]
+MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
+# [END howto_operator_memcached_instance]
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',  # Override to match your needs
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # [START howto_operator_create_instance_memcached]
+    create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance",
+        location=LOCATION,
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        instance=MEMCACHED_INSTANCE,
+        project_id=PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance",
+        location=LOCATION,
+        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+    delete_memcached_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_get_instance_memcached]
+    get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance",
+        location=LOCATION,
+        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances", location="-", project_id=PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance",
+        location=LOCATION,
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=PROJECT_ID,
+        update_mask=FieldMask(paths=["node_count"]),
+        instance={"node_count": 2},  # 2
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location=LOCATION,
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=PROJECT_ID,
+        update_mask={"paths": ["params"]},
+        parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
+    )
+
+    apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+        task_id="apply-parameters",
+        location=LOCATION,
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=PROJECT_ID,
+        node_ids=["node-a-1"],
+        apply_all=False,
+    )
+    # [END howto_operator_update_and_apply_parameters_memcached]
+
+    (
+        create_memcached_instance
+        >> get_memcached_instance
+        >> list_memcached_instances
+        >> update_memcached_instance
+        >> update_memcached_parameters
+        >> apply_memcached_parameters
+        >> delete_memcached_instance
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
similarity index 52%
rename from airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
rename to tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
index 6f9dd09ff3..c971b9a1f1 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
@@ -22,7 +22,6 @@ import os
 from datetime import datetime
 
 from google.cloud.redis_v1 import FailoverInstanceRequest, Instance
-from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -36,36 +35,27 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
     CloudMemorystoreGetInstanceOperator,
     CloudMemorystoreImportOperator,
     CloudMemorystoreListInstancesOperator,
-    CloudMemorystoreMemcachedApplyParametersOperator,
-    CloudMemorystoreMemcachedCreateInstanceOperator,
-    CloudMemorystoreMemcachedDeleteInstanceOperator,
-    CloudMemorystoreMemcachedGetInstanceOperator,
-    CloudMemorystoreMemcachedListInstancesOperator,
-    CloudMemorystoreMemcachedUpdateInstanceOperator,
-    CloudMemorystoreMemcachedUpdateParametersOperator,
     CloudMemorystoreScaleInstanceOperator,
     CloudMemorystoreUpdateInstanceOperator,
 )
-from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSBucketCreateAclEntryOperator,
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
 
-START_DATE = datetime(2021, 1, 1)
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "cloud_memorystore_redis"
 
-MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get(
-    "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME", "test-memorystore-redis"
-)
-MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
-    "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME_2", "test-memorystore-redis-2"
-)
-MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
-    "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME_3", "test-memorystore-redis-3"
-)
-MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
-    "GCP_MEMORYSTORE_MEMCACHED_INSTANCE_NAME", "test-memorystore-memcached-1"
-)
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "europe-north1"
+MEMORYSTORE_REDIS_INSTANCE_NAME = f"{ENV_ID}-redis-1"
+MEMORYSTORE_REDIS_INSTANCE_NAME_2 = f"{ENV_ID}-redis-2"
+MEMORYSTORE_REDIS_INSTANCE_NAME_3 = f"{ENV_ID}-redis-3"
 
-BUCKET_NAME = os.environ.get("GCP_MEMORYSTORE_BUCKET", "INVALID BUCKET NAME")
 EXPORT_GCS_URL = f"gs://{BUCKET_NAME}/my-export.rdb"
 
 # [START howto_operator_instance]
@@ -74,25 +64,23 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
 
 SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
 
-# [START howto_operator_memcached_instance]
-MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
-# [END howto_operator_memcached_instance]
-
 
 with models.DAG(
-    "gcp_cloud_memorystore_redis",
+    DAG_ID,
     schedule_interval='@once',  # Override to match your needs
-    start_date=START_DATE,
+    start_date=datetime(2021, 1, 1),
     catchup=False,
     tags=['example'],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
     # [START howto_operator_create_instance]
     create_instance = CloudMemorystoreCreateInstanceOperator(
         task_id="create-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
         instance=FIRST_INSTANCE,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [END howto_operator_create_instance]
 
@@ -105,18 +93,18 @@ with models.DAG(
 
     create_instance_2 = CloudMemorystoreCreateInstanceOperator(
         task_id="create-instance-2",
-        location="europe-north1",
+        location=LOCATION,
         instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         instance=SECOND_INSTANCE,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
 
     # [START howto_operator_get_instance]
     get_instance = CloudMemorystoreGetInstanceOperator(
         task_id="get-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         do_xcom_push=True,
     )
     # [END howto_operator_get_instance]
@@ -130,18 +118,18 @@ with models.DAG(
     # [START howto_operator_failover_instance]
     failover_instance = CloudMemorystoreFailoverInstanceOperator(
         task_id="failover-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
             FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS
         ),
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [END howto_operator_failover_instance]
 
     # [START howto_operator_list_instances]
     list_instances = CloudMemorystoreListInstancesOperator(
-        task_id="list-instances", location="-", page_size=100, project_id=GCP_PROJECT_ID
+        task_id="list-instances", location="-", page_size=100, project_id=PROJECT_ID
     )
     # [END howto_operator_list_instances]
 
@@ -154,9 +142,9 @@ with models.DAG(
     # [START howto_operator_update_instance]
     update_instance = CloudMemorystoreUpdateInstanceOperator(
         task_id="update-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         update_mask={"paths": ["memory_size_gb"]},
         instance={"memory_size_gb": 2},
     )
@@ -175,56 +163,58 @@ with models.DAG(
     # [START howto_operator_export_instance]
     export_instance = CloudMemorystoreExportInstanceOperator(
         task_id="export-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
         output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [END howto_operator_export_instance]
 
     # [START howto_operator_import_instance]
     import_instance = CloudMemorystoreImportOperator(
         task_id="import-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [END howto_operator_import_instance]
 
     # [START howto_operator_delete_instance]
     delete_instance = CloudMemorystoreDeleteInstanceOperator(
         task_id="delete-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [END howto_operator_delete_instance]
+    delete_instance.trigger_rule = TriggerRule.ALL_DONE
 
     delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
         task_id="delete-instance-2",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
 
     # [END howto_operator_create_instance_and_import]
     create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
         task_id="create-instance-and-import",
-        location="europe-north1",
+        location=LOCATION,
         instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
         instance=FIRST_INSTANCE,
         input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [START howto_operator_create_instance_and_import]
 
     # [START howto_operator_scale_instance]
     scale_instance = CloudMemorystoreScaleInstanceOperator(
         task_id="scale-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         memory_size_gb=3,
     )
     # [END howto_operator_scale_instance]
@@ -232,106 +222,50 @@ with models.DAG(
     # [END howto_operator_export_and_delete_instance]
     export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
         task_id="export-and-delete-instance",
-        location="europe-north1",
+        location=LOCATION,
         instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
         output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
     )
     # [START howto_operator_export_and_delete_instance]
+    export_and_delete_instance.trigger_rule = TriggerRule.ALL_DONE
 
-    create_instance >> get_instance >> get_instance_result
-    create_instance >> update_instance
-    create_instance >> export_instance
-    create_instance_2 >> import_instance
-    create_instance >> list_instances >> list_instances_result
-    list_instances >> delete_instance
-    export_instance >> update_instance
-    update_instance >> delete_instance
-    create_instance >> create_instance_result
-    get_instance >> set_acl_permission >> export_instance
-    get_instance >> list_instances_result
-    export_instance >> import_instance
-    export_instance >> delete_instance
-    failover_instance >> delete_instance_2
-    import_instance >> failover_instance
-
-    export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
-
-
-with models.DAG(
-    "gcp_cloud_memorystore_memcached",
-    schedule_interval='@once',  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=['example'],
-) as dag_memcache:
-    # [START howto_operator_create_instance_memcached]
-    create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
-        task_id="create-instance",
-        location="europe-north1",
-        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        instance=MEMCACHED_INSTANCE,
-        project_id=GCP_PROJECT_ID,
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
     )
-    # [END howto_operator_create_instance_memcached]
 
-    # [START howto_operator_delete_instance_memcached]
-    delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
-        task_id="delete-instance",
-        location="europe-north1",
-        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
+    (
+        create_bucket
+        >> create_instance
+        >> create_instance_result
+        >> get_instance
+        >> get_instance_result
+        >> set_acl_permission
+        >> export_instance
+        >> update_instance
+        >> list_instances
+        >> list_instances_result
+        >> create_instance_2
+        >> failover_instance
+        >> import_instance
+        >> delete_instance
+        >> delete_instance_2
+        >> create_instance_and_import
+        >> scale_instance
+        >> export_and_delete_instance
+        >> delete_bucket
     )
-    # [END howto_operator_delete_instance_memcached]
 
-    # [START howto_operator_get_instance_memcached]
-    get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
-        task_id="get-instance",
-        location="europe-north1",
-        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
-    )
-    # [END howto_operator_get_instance_memcached]
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
 
-    # [START howto_operator_list_instances_memcached]
-    list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
-        task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
-    )
-    # [END howto_operator_list_instances_memcached]
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
-    # # [START howto_operator_update_instance_memcached]
-    update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
-        task_id="update-instance",
-        location="europe-north1",
-        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
-        update_mask=FieldMask(paths=["node_count"]),
-        instance={"node_count": 2},
-    )
-    # [END howto_operator_update_instance_memcached]
-
-    # [START howto_operator_update_and_apply_parameters_memcached]
-    update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
-        task_id="update-parameters",
-        location="europe-north1",
-        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
-        update_mask={"paths": ["params"]},
-        parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
-    )
-
-    apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
-        task_id="apply-parameters",
-        location="europe-north1",
-        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
-        project_id=GCP_PROJECT_ID,
-        node_ids=["node-a-1"],
-        apply_all=False,
-    )
 
-    # update_parameters >> apply_parameters
-    # [END howto_operator_update_and_apply_parameters_memcached]
+from tests.system.utils import get_test_run  # noqa: E402
 
-    create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
-    create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
-    update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)