You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/28 20:06:03 UTC
[airflow] branch main updated: Memorystore assets & system tests migration (AIP-47) (#25361)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f6b48ac6df Memorystore assets & system tests migration (AIP-47) (#25361)
f6b48ac6df is described below
commit f6b48ac6dfaf931a5433ec16369302f68f038c65
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Thu Jul 28 22:05:54 2022 +0200
Memorystore assets & system tests migration (AIP-47) (#25361)
---
.../google/cloud/hooks/cloud_memorystore.py | 6 +-
.../google/cloud/links/cloud_memorystore.py | 119 +++++++++++
.../google/cloud/operators/cloud_memorystore.py | 134 ++++++++++++-
airflow/providers/google/provider.yaml | 4 +
.../operators/cloud/cloud_memorystore.rst | 26 +--
.../cloud/cloud_memorystore_memcached.rst | 14 +-
.../cloud/operators/test_cloud_memorystore.py | 16 ++
.../operators/test_cloud_memorystore_system.py | 51 -----
.../google/cloud/cloud_memorystore/__init__.py | 16 ++
.../example_cloud_memorystore_memcached.py | 146 ++++++++++++++
.../example_cloud_memorystore_redis.py | 220 ++++++++-------------
11 files changed, 533 insertions(+), 219 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 0abb308dcf..586b8d3d2f 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -478,8 +478,9 @@ class CloudMemorystoreHook(GoogleBaseHook):
timeout=timeout,
metadata=metadata,
)
- result.result()
+ updated_instance = result.result()
self.log.info("Instance updated: %s", instance.name)
+ return updated_instance
class CloudMemorystoreMemcachedHook(GoogleBaseHook):
@@ -833,8 +834,9 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
result = client.update_instance(
update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata or ()
)
- result.result()
+ updated_instance = result.result()
self.log.info("Instance updated: %s", instance.name)
+ return updated_instance
@GoogleBaseHook.fallback_to_default_project_id
def update_parameters(
diff --git a/airflow/providers/google/cloud/links/cloud_memorystore.py b/airflow/providers/google/cloud/links/cloud_memorystore.py
new file mode 100644
index 0000000000..973ef54ae3
--- /dev/null
+++ b/airflow/providers/google/cloud/links/cloud_memorystore.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Cloud Memorystore links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com/memorystore"
+MEMCACHED_LINK = (
+ BASE_LINK + "/memcached/locations/{location_id}/instances/{instance_id}/details?project={project_id}"
+)
+MEMCACHED_LIST_LINK = BASE_LINK + "/memcached/instances?project={project_id}"
+REDIS_LINK = (
+ BASE_LINK + "/redis/locations/{location_id}/instances/{instance_id}/details/overview?project={project_id}"
+)
+REDIS_LIST_LINK = BASE_LINK + "/redis/instances?project={project_id}"
+
+
+class MemcachedInstanceDetailsLink(BaseGoogleLink):
+ """Helper class for constructing Memorystore Memcached Instance Link"""
+
+ name = "Memorystore Memcached Instance"
+ key = "memcached_instance"
+ format_str = MEMCACHED_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ instance_id: str,
+ location_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=MemcachedInstanceDetailsLink.key,
+ value={"instance_id": instance_id, "location_id": location_id, "project_id": project_id},
+ )
+
+
+class MemcachedInstanceListLink(BaseGoogleLink):
+ """Helper class for constructing Memorystore Memcached List of Instances Link"""
+
+ name = "Memorystore Memcached List of Instances"
+ key = "memcached_instances"
+ format_str = MEMCACHED_LIST_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=MemcachedInstanceListLink.key,
+ value={"project_id": project_id},
+ )
+
+
+class RedisInstanceDetailsLink(BaseGoogleLink):
+ """Helper class for constructing Memorystore Redis Instance Link"""
+
+ name = "Memorystore Redis Instance"
+ key = "redis_instance"
+ format_str = REDIS_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ instance_id: str,
+ location_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=RedisInstanceDetailsLink.key,
+ value={"instance_id": instance_id, "location_id": location_id, "project_id": project_id},
+ )
+
+
+class RedisInstanceListLink(BaseGoogleLink):
+ """Helper class for constructing Memorystore Redis List of Instances Link"""
+
+ name = "Memorystore Redis List of Instances"
+ key = "redis_instances"
+ format_str = REDIS_LIST_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=RedisInstanceListLink.key,
+ value={"project_id": project_id},
+ )
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index cbb4ccd025..3ec336ff17 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -36,6 +36,12 @@ from airflow.providers.google.cloud.hooks.cloud_memorystore import (
CloudMemorystoreHook,
CloudMemorystoreMemcachedHook,
)
+from airflow.providers.google.cloud.links.cloud_memorystore import (
+ MemcachedInstanceDetailsLink,
+ MemcachedInstanceListLink,
+ RedisInstanceDetailsLink,
+ RedisInstanceListLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -94,6 +100,7 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -133,6 +140,13 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return Instance.to_dict(result)
@@ -257,6 +271,7 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -297,6 +312,13 @@ class CloudMemorystoreExportInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
@@ -341,6 +363,7 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -380,6 +403,13 @@ class CloudMemorystoreFailoverInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreGetInstanceOperator(BaseOperator):
@@ -420,6 +450,7 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -456,6 +487,13 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return Instance.to_dict(result)
@@ -505,6 +543,7 @@ class CloudMemorystoreImportOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -544,6 +583,13 @@ class CloudMemorystoreImportOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreListInstancesOperator(BaseOperator):
@@ -588,6 +634,7 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceListLink(),)
def __init__(
self,
@@ -624,6 +671,11 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceListLink.persist(
+ context=context,
+ task_instance=self,
+ project_id=self.project_id or hook.project_id,
+ )
instances = [Instance.to_dict(a) for a in result]
return instances
@@ -683,6 +735,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -715,7 +768,7 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
hook = CloudMemorystoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_instance(
+ res = hook.update_instance(
update_mask=self.update_mask,
instance=self.instance,
location=self.location,
@@ -725,6 +778,15 @@ class CloudMemorystoreUpdateInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+ location_id, instance_id = res.name.split("/")[-3::2]
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id or instance_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreScaleInstanceOperator(BaseOperator):
@@ -767,6 +829,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -798,7 +861,7 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_instance(
+ res = hook.update_instance(
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": self.memory_size_gb},
location=self.location,
@@ -808,6 +871,15 @@ class CloudMemorystoreScaleInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+ location_id, instance_id = res.name.split("/")[-3::2]
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id or instance_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
@@ -869,6 +941,7 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (RedisInstanceDetailsLink(),)
def __init__(
self,
@@ -921,6 +994,13 @@ class CloudMemorystoreCreateInstanceAndImportOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ RedisInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
@@ -1055,6 +1135,7 @@ class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (MemcachedInstanceDetailsLink(),)
def __init__(
self,
@@ -1097,6 +1178,13 @@ class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ MemcachedInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id,
+ location_id=self.location,
+ project_id=self.project_id,
+ )
class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
@@ -1143,6 +1231,7 @@ class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
"metadata",
"gcp_conn_id",
)
+ operator_extra_links = (MemcachedInstanceDetailsLink(),)
def __init__(
self,
@@ -1178,6 +1267,13 @@ class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ MemcachedInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return cloud_memcache.Instance.to_dict(result)
@@ -1282,6 +1378,7 @@ class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (MemcachedInstanceDetailsLink(),)
def __init__(
self,
@@ -1318,6 +1415,13 @@ class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ MemcachedInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return cloud_memcache.Instance.to_dict(result)
@@ -1360,6 +1464,7 @@ class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (MemcachedInstanceListLink(),)
def __init__(
self,
@@ -1393,6 +1498,11 @@ class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ MemcachedInstanceListLink.persist(
+ context=context,
+ task_instance=self,
+ project_id=self.project_id or hook.project_id,
+ )
instances = [cloud_memcache.Instance.to_dict(a) for a in result]
return instances
@@ -1449,6 +1559,7 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (MemcachedInstanceDetailsLink(),)
def __init__(
self,
@@ -1481,7 +1592,7 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
hook = CloudMemorystoreMemcachedHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_instance(
+ res = hook.update_instance(
update_mask=self.update_mask,
instance=self.instance,
location=self.location,
@@ -1491,6 +1602,15 @@ class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ # projects/PROJECT_NAME/locations/LOCATION/instances/INSTANCE
+ location_id, instance_id = res.name.split("/")[-3::2]
+ MemcachedInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id or instance_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
@@ -1532,6 +1652,7 @@ class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (MemcachedInstanceDetailsLink(),)
def __init__(
self,
@@ -1574,3 +1695,10 @@ class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ MemcachedInstanceDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ instance_id=self.instance_id,
+ location_id=self.location,
+ project_id=self.project_id,
+ )
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index dc50c3c94b..1533cf4612 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -980,6 +980,10 @@ extra-links:
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
- airflow.providers.google.cloud.links.pubsub.PubSubSubscriptionLink
- airflow.providers.google.cloud.links.pubsub.PubSubTopicLink
+ - airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceDetailsLink
+ - airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceListLink
+ - airflow.providers.google.cloud.links.cloud_memorystore.RedisInstanceDetailsLink
+ - airflow.providers.google.cloud.links.cloud_memorystore.RedisInstanceListLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
index dfddcb49c1..2ac632a7a6 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore.rst
@@ -41,7 +41,7 @@ presented as a compatible dictionary also.
Here is an example of instance
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:start-after: [START howto_operator_instance]
:end-before: [END howto_operator_instance]
@@ -59,7 +59,7 @@ make a use of the service account listed under ``persistenceIamIdentity``.
You can use :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`
operator to set permissions.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_set_acl_permission]
@@ -76,7 +76,7 @@ Create instance
Create a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreCreateInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_instance]
@@ -87,7 +87,7 @@ You can use :ref:`Jinja templating <concepts:jinja-templating>` with
parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`, which allows it
to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_instance_result]
@@ -102,7 +102,7 @@ Delete instance
Delete a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreDeleteInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_instance]
@@ -120,7 +120,7 @@ Export instance
Delete a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreExportInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_export_instance]
@@ -138,7 +138,7 @@ Failover instance
Delete a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreFailoverInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_failover_instance]
@@ -156,7 +156,7 @@ Get instance
Delete a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreGetInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_get_instance]
@@ -174,7 +174,7 @@ Import instance
Delete a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreImportOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_import_instance]
@@ -192,7 +192,7 @@ List instances
List a instances is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreListInstancesOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_list_instances]
@@ -203,7 +203,7 @@ You can use :ref:`Jinja templating <concepts:jinja-templating>` with
parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`, which allows it
to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_list_instances_result]
@@ -217,7 +217,7 @@ Update instance
Update a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreUpdateInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_instance]
@@ -236,7 +236,7 @@ Scale instance
Scale a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreScaleInstanceOperator` operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
:language: python
:dedent: 4
:start-after: [START howto_operator_scale_instance]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
index a3b6f50763..2fe980be02 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_memorystore_memcached.rst
@@ -41,7 +41,7 @@ The object can be presented as a compatible dictionary also.
Here is an example of instance
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:start-after: [START howto_operator_memcached_instance]
:end-before: [END howto_operator_memcached_instance]
@@ -56,7 +56,7 @@ Create a instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedCreateInstanceOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_instance_memcached]
@@ -72,7 +72,7 @@ Delete an instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedDeleteInstanceOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_instance_memcached]
@@ -88,7 +88,7 @@ Get an instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedGetInstanceOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_get_instance_memcached]
@@ -104,7 +104,7 @@ List instances is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedListInstancesOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_list_instances_memcached]
@@ -120,7 +120,7 @@ Updating an instance is performed with the
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateInstanceOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_instance_memcached]
@@ -138,7 +138,7 @@ and
:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedApplyParametersOperator`
operator.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_and_apply_parameters_memcached]
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
index 7a514e7fc3..2c4e3c85bc 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
@@ -60,6 +60,7 @@ TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]} # TODO: Fill missing value
TEST_UPDATE_MASK_MEMCACHED = {"displayName": "memcached instance"}
TEST_PARENT = "test-parent"
TEST_NAME = "test-name"
+TEST_UPDATE_INSTANCE_NAME = "projects/{project_id}/locations/{location}/instances/{instance_id}"
class TestCloudMemorystoreCreateInstanceOperator(TestCase):
@@ -278,6 +279,11 @@ class TestCloudMemorystoreListInstancesOperator(TestCase):
class TestCloudMemorystoreUpdateInstanceOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreHook")
def test_assert_valid_hook_call(self, mock_hook):
+ mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ )
task = CloudMemorystoreUpdateInstanceOperator(
task_id=TEST_TASK_ID,
update_mask=TEST_UPDATE_MASK,
@@ -311,6 +317,11 @@ class TestCloudMemorystoreUpdateInstanceOperator(TestCase):
class TestCloudMemorystoreScaleInstanceOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreHook")
def test_assert_valid_hook_call(self, mock_hook):
+ mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ )
task = CloudMemorystoreScaleInstanceOperator(
task_id=TEST_TASK_ID,
memory_size_gb=TEST_INSTANCE_SIZE,
@@ -498,6 +509,11 @@ class TestCloudMemorystoreMemcachedListInstancesOperator(TestCase):
class TestCloudMemorystoreMemcachedUpdateInstanceOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
def test_assert_valid_hook_call(self, mock_hook):
+ mock_hook.return_value.update_instance.return_value.name = TEST_UPDATE_INSTANCE_NAME.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ )
task = CloudMemorystoreMemcachedUpdateInstanceOperator(
task_id=TEST_TASK_ID,
update_mask=TEST_UPDATE_MASK_MEMCACHED,
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
deleted file mode 100644
index 6a00e23c90..0000000000
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""System tests for Google Cloud Memorystore operators"""
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_cloud_memorystore import BUCKET_NAME
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_MEMORYSTORE
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_MEMORYSTORE)
-class CloudMemorystoreSystemTest(GoogleSystemTest):
- """
- System tests for Google Cloud Memorystore operators
-
- It use a real service.
- """
-
- @provide_gcp_context(GCP_MEMORYSTORE)
- def setUp(self):
- super().setUp()
- self.create_gcs_bucket(BUCKET_NAME, location="europe-north1")
-
- @provide_gcp_context(GCP_MEMORYSTORE)
- def test_run_example_dag_memorystore_redis(self):
- self.run_dag('gcp_cloud_memorystore_redis', CLOUD_DAG_FOLDER)
-
- @provide_gcp_context(GCP_MEMORYSTORE)
- def test_run_example_dag_memorystore_memcached(self):
- self.run_dag('gcp_cloud_memorystore_memcached', CLOUD_DAG_FOLDER)
-
- @provide_gcp_context(GCP_MEMORYSTORE)
- def tearDown(self):
- self.delete_gcs_bucket(BUCKET_NAME)
- super().tearDown()
diff --git a/tests/system/providers/google/cloud/cloud_memorystore/__init__.py b/tests/system/providers/google/cloud/cloud_memorystore/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/cloud_memorystore/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
new file mode 100644
index 0000000000..95d844b7c2
--- /dev/null
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google Cloud Memorystore Memcached service.
+"""
+import os
+from datetime import datetime
+
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.providers.google.cloud.operators.cloud_memorystore import (
+ CloudMemorystoreMemcachedApplyParametersOperator,
+ CloudMemorystoreMemcachedCreateInstanceOperator,
+ CloudMemorystoreMemcachedDeleteInstanceOperator,
+ CloudMemorystoreMemcachedGetInstanceOperator,
+ CloudMemorystoreMemcachedListInstancesOperator,
+ CloudMemorystoreMemcachedUpdateInstanceOperator,
+ CloudMemorystoreMemcachedUpdateParametersOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "google_project_id")
+
+DAG_ID = "cloud_memorystore_memcached"
+
+MEMORYSTORE_MEMCACHED_INSTANCE_NAME = f"{ENV_ID}-memcached-1"
+LOCATION = "europe-north1"
+
+# [START howto_operator_memcached_instance]
+MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
+# [END howto_operator_memcached_instance]
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once', # Override to match your needs
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example'],
+) as dag:
+ # [START howto_operator_create_instance_memcached]
+ create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
+ task_id="create-instance",
+ location=LOCATION,
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ instance=MEMCACHED_INSTANCE,
+ project_id=PROJECT_ID,
+ )
+ # [END howto_operator_create_instance_memcached]
+
+ # [START howto_operator_delete_instance_memcached]
+ delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
+ task_id="delete-instance",
+ location=LOCATION,
+ instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=PROJECT_ID,
+ )
+ # [END howto_operator_delete_instance_memcached]
+ delete_memcached_instance.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_get_instance_memcached]
+ get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
+ task_id="get-instance",
+ location=LOCATION,
+ instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=PROJECT_ID,
+ )
+ # [END howto_operator_get_instance_memcached]
+
+ # [START howto_operator_list_instances_memcached]
+ list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
+ task_id="list-instances", location="-", project_id=PROJECT_ID
+ )
+ # [END howto_operator_list_instances_memcached]
+
+ # [START howto_operator_update_instance_memcached]
+ update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
+ task_id="update-instance",
+ location=LOCATION,
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=PROJECT_ID,
+ update_mask=FieldMask(paths=["node_count"]),
+ instance={"node_count": 2}, # 2
+ )
+ # [END howto_operator_update_instance_memcached]
+
+ # [START howto_operator_update_and_apply_parameters_memcached]
+ update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+ task_id="update-parameters",
+ location=LOCATION,
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=PROJECT_ID,
+ update_mask={"paths": ["params"]},
+ parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
+ )
+
+ apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+ task_id="apply-parameters",
+ location=LOCATION,
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=PROJECT_ID,
+ node_ids=["node-a-1"],
+ apply_all=False,
+ )
+ # [END howto_operator_update_and_apply_parameters_memcached]
+
+ (
+ create_memcached_instance
+ >> get_memcached_instance
+ >> list_memcached_instances
+ >> update_memcached_instance
+ >> update_memcached_parameters
+ >> apply_memcached_parameters
+ >> delete_memcached_instance
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
similarity index 52%
rename from airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
rename to tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
index 6f9dd09ff3..c971b9a1f1 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_redis.py
@@ -22,7 +22,6 @@ import os
from datetime import datetime
from google.cloud.redis_v1 import FailoverInstanceRequest, Instance
-from google.protobuf.field_mask_pb2 import FieldMask
from airflow import models
from airflow.operators.bash import BashOperator
@@ -36,36 +35,27 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
CloudMemorystoreGetInstanceOperator,
CloudMemorystoreImportOperator,
CloudMemorystoreListInstancesOperator,
- CloudMemorystoreMemcachedApplyParametersOperator,
- CloudMemorystoreMemcachedCreateInstanceOperator,
- CloudMemorystoreMemcachedDeleteInstanceOperator,
- CloudMemorystoreMemcachedGetInstanceOperator,
- CloudMemorystoreMemcachedListInstancesOperator,
- CloudMemorystoreMemcachedUpdateInstanceOperator,
- CloudMemorystoreMemcachedUpdateParametersOperator,
CloudMemorystoreScaleInstanceOperator,
CloudMemorystoreUpdateInstanceOperator,
)
-from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSBucketCreateAclEntryOperator,
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
-START_DATE = datetime(2021, 1, 1)
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "cloud_memorystore_redis"
-MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get(
- "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME", "test-memorystore-redis"
-)
-MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
- "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME_2", "test-memorystore-redis-2"
-)
-MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
- "GCP_MEMORYSTORE_REDIS_INSTANCE_NAME_3", "test-memorystore-redis-3"
-)
-MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
- "GCP_MEMORYSTORE_MEMCACHED_INSTANCE_NAME", "test-memorystore-memcached-1"
-)
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "europe-north1"
+MEMORYSTORE_REDIS_INSTANCE_NAME = f"{ENV_ID}-redis-1"
+MEMORYSTORE_REDIS_INSTANCE_NAME_2 = f"{ENV_ID}-redis-2"
+MEMORYSTORE_REDIS_INSTANCE_NAME_3 = f"{ENV_ID}-redis-3"
-BUCKET_NAME = os.environ.get("GCP_MEMORYSTORE_BUCKET", "INVALID BUCKET NAME")
EXPORT_GCS_URL = f"gs://{BUCKET_NAME}/my-export.rdb"
# [START howto_operator_instance]
@@ -74,25 +64,23 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
-# [START howto_operator_memcached_instance]
-MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
-# [END howto_operator_memcached_instance]
-
with models.DAG(
- "gcp_cloud_memorystore_redis",
+ DAG_ID,
schedule_interval='@once', # Override to match your needs
- start_date=START_DATE,
+ start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
# [START howto_operator_create_instance]
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
- location="europe-north1",
+ location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [END howto_operator_create_instance]
@@ -105,18 +93,18 @@ with models.DAG(
create_instance_2 = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance-2",
- location="europe-north1",
+ location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
instance=SECOND_INSTANCE,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [START howto_operator_get_instance]
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
do_xcom_push=True,
)
# [END howto_operator_get_instance]
@@ -130,18 +118,18 @@ with models.DAG(
# [START howto_operator_failover_instance]
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS
),
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [END howto_operator_failover_instance]
# [START howto_operator_list_instances]
list_instances = CloudMemorystoreListInstancesOperator(
- task_id="list-instances", location="-", page_size=100, project_id=GCP_PROJECT_ID
+ task_id="list-instances", location="-", page_size=100, project_id=PROJECT_ID
)
# [END howto_operator_list_instances]
@@ -154,9 +142,9 @@ with models.DAG(
# [START howto_operator_update_instance]
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
- location="europe-north1",
+ location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
)
@@ -175,56 +163,58 @@ with models.DAG(
# [START howto_operator_export_instance]
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [END howto_operator_export_instance]
# [START howto_operator_import_instance]
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [END howto_operator_import_instance]
# [START howto_operator_delete_instance]
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [END howto_operator_delete_instance]
+ delete_instance.trigger_rule = TriggerRule.ALL_DONE
delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance-2",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_create_instance_and_import]
create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
task_id="create-instance-and-import",
- location="europe-north1",
+ location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
instance=FIRST_INSTANCE,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [START howto_operator_create_instance_and_import]
# [START howto_operator_scale_instance]
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
- location="europe-north1",
+ location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
memory_size_gb=3,
)
# [END howto_operator_scale_instance]
@@ -232,106 +222,50 @@ with models.DAG(
# [END howto_operator_export_and_delete_instance]
export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
task_id="export-and-delete-instance",
- location="europe-north1",
+ location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
)
# [START howto_operator_export_and_delete_instance]
+ export_and_delete_instance.trigger_rule = TriggerRule.ALL_DONE
- create_instance >> get_instance >> get_instance_result
- create_instance >> update_instance
- create_instance >> export_instance
- create_instance_2 >> import_instance
- create_instance >> list_instances >> list_instances_result
- list_instances >> delete_instance
- export_instance >> update_instance
- update_instance >> delete_instance
- create_instance >> create_instance_result
- get_instance >> set_acl_permission >> export_instance
- get_instance >> list_instances_result
- export_instance >> import_instance
- export_instance >> delete_instance
- failover_instance >> delete_instance_2
- import_instance >> failover_instance
-
- export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
-
-
-with models.DAG(
- "gcp_cloud_memorystore_memcached",
- schedule_interval='@once', # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=['example'],
-) as dag_memcache:
- # [START howto_operator_create_instance_memcached]
- create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
- task_id="create-instance",
- location="europe-north1",
- instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- instance=MEMCACHED_INSTANCE,
- project_id=GCP_PROJECT_ID,
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)
- # [END howto_operator_create_instance_memcached]
- # [START howto_operator_delete_instance_memcached]
- delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
- task_id="delete-instance",
- location="europe-north1",
- instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
+ (
+ create_bucket
+ >> create_instance
+ >> create_instance_result
+ >> get_instance
+ >> get_instance_result
+ >> set_acl_permission
+ >> export_instance
+ >> update_instance
+ >> list_instances
+ >> list_instances_result
+ >> create_instance_2
+ >> failover_instance
+ >> import_instance
+ >> delete_instance
+ >> delete_instance_2
+ >> create_instance_and_import
+ >> scale_instance
+ >> export_and_delete_instance
+ >> delete_bucket
)
- # [END howto_operator_delete_instance_memcached]
- # [START howto_operator_get_instance_memcached]
- get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
- task_id="get-instance",
- location="europe-north1",
- instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
- )
- # [END howto_operator_get_instance_memcached]
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
- # [START howto_operator_list_instances_memcached]
- list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
- task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
- )
- # [END howto_operator_list_instances_memcached]
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
- # # [START howto_operator_update_instance_memcached]
- update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
- task_id="update-instance",
- location="europe-north1",
- instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
- update_mask=FieldMask(paths=["node_count"]),
- instance={"node_count": 2},
- )
- # [END howto_operator_update_instance_memcached]
-
- # [START howto_operator_update_and_apply_parameters_memcached]
- update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
- task_id="update-parameters",
- location="europe-north1",
- instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
- update_mask={"paths": ["params"]},
- parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
- )
-
- apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
- task_id="apply-parameters",
- location="europe-north1",
- instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
- project_id=GCP_PROJECT_ID,
- node_ids=["node-a-1"],
- apply_all=False,
- )
- # update_parameters >> apply_parameters
- # [END howto_operator_update_and_apply_parameters_memcached]
+from tests.system.utils import get_test_run # noqa: E402
- create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
- create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
- update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)