You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/22 02:00:46 UTC
[airflow] branch master updated: Add Google Cloud Memorystore
Memcached Operators (#10121)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 9150330 Add Google Cloud Memorystore Memcached Operators (#10121)
9150330 is described below
commit 91503308c723b186ce6f4026f2a3e2c21030f6e5
Author: Tanjin Panna <ta...@gmail.com>
AuthorDate: Wed Oct 21 21:50:40 2020 -0400
Add Google Cloud Memorystore Memcached Operators (#10121)
Co-authored-by: Tobiasz Kędzierski <to...@polidea.com>
Co-authored-by: Kamil Breguła <mi...@users.noreply.github.com>
---
.../example_dags/example_cloud_memorystore.py | 131 ++++-
.../google/cloud/hooks/cloud_memorystore.py | 451 +++++++++++++++
.../google/cloud/operators/cloud_memorystore.py | 616 ++++++++++++++++++++-
.../google/cloud/cloud_memorystore_memcached.rst | 158 ++++++
docs/operators-and-hooks-ref.rst | 8 +-
setup.py | 1 +
.../google/cloud/hooks/test_cloud_memorystore.py | 188 ++++++-
.../cloud/operators/test_cloud_memorystore.py | 147 +++++
.../operators/test_cloud_memorystore_system.py | 8 +-
9 files changed, 1687 insertions(+), 21 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
index a3d8c2b..704f02c 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+++ b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
@@ -22,6 +22,7 @@ import os
from urllib.parse import urlparse
from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
+from google.cloud.memcache_v1beta2.types import cloud_memcache
from airflow import models
from airflow.operators.bash import BashOperator
@@ -37,15 +38,29 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
CloudMemorystoreListInstancesOperator,
CloudMemorystoreScaleInstanceOperator,
CloudMemorystoreUpdateInstanceOperator,
+ CloudMemorystoreMemcachedApplyParametersOperator,
+ CloudMemorystoreMemcachedCreateInstanceOperator,
+ CloudMemorystoreMemcachedDeleteInstanceOperator,
+ CloudMemorystoreMemcachedGetInstanceOperator,
+ CloudMemorystoreMemcachedListInstancesOperator,
+ CloudMemorystoreMemcachedUpdateInstanceOperator,
+ CloudMemorystoreMemcachedUpdateParametersOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
from airflow.utils import dates
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore")
-INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2")
-INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3")
+MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-")
+MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
+ "GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2"
+)
+MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
+ "GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3"
+)
+MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
+ "GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1"
+)
EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb")
EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL)
@@ -57,9 +72,13 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
+# [START howto_operator_memcached_instance]
+MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
+# [END howto_operator_memcached_instance]
+
with models.DAG(
- "gcp_cloud_memorystore",
+ "gcp_cloud_memorystore_redis",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
@@ -68,7 +87,7 @@ with models.DAG(
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
- instance_id=INSTANCE_NAME,
+ instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=GCP_PROJECT_ID,
)
@@ -84,7 +103,7 @@ with models.DAG(
create_instance_2 = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance-2",
location="europe-north1",
- instance_id=INSTANCE_NAME_2,
+ instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
instance=SECOND_INSTANCE,
project_id=GCP_PROJECT_ID,
)
@@ -93,7 +112,7 @@ with models.DAG(
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
- instance=INSTANCE_NAME,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
do_xcom_push=True,
)
@@ -109,7 +128,7 @@ with models.DAG(
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location="europe-north1",
- instance=INSTANCE_NAME_2,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS,
project_id=GCP_PROJECT_ID,
)
@@ -131,7 +150,7 @@ with models.DAG(
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
- instance_id=INSTANCE_NAME,
+ instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
@@ -152,7 +171,7 @@ with models.DAG(
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location="europe-north1",
- instance=INSTANCE_NAME,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
@@ -162,7 +181,7 @@ with models.DAG(
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location="europe-north1",
- instance=INSTANCE_NAME_2,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
@@ -170,14 +189,17 @@ with models.DAG(
# [START howto_operator_delete_instance]
delete_instance = CloudMemorystoreDeleteInstanceOperator(
- task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID
+ task_id="delete-instance",
+ location="europe-north1",
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
)
# [END howto_operator_delete_instance]
delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance-2",
location="europe-north1",
- instance=INSTANCE_NAME_2,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
project_id=GCP_PROJECT_ID,
)
@@ -185,7 +207,7 @@ with models.DAG(
create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
task_id="create-instance-and-import",
location="europe-north1",
- instance_id=INSTANCE_NAME_3,
+ instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
instance=FIRST_INSTANCE,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
@@ -196,7 +218,7 @@ with models.DAG(
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location="europe-north1",
- instance_id=INSTANCE_NAME_3,
+ instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=GCP_PROJECT_ID,
memory_size_gb=3,
)
@@ -206,7 +228,7 @@ with models.DAG(
export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
task_id="export-and-delete-instance",
location="europe-north1",
- instance=INSTANCE_NAME_3,
+ instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
@@ -229,3 +251,80 @@ with models.DAG(
failover_instance >> delete_instance_2
export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+with models.DAG(
+ "gcp_cloud_memorystore_memcached",
+ schedule_interval=None, # Override to match your needs
+ start_date=dates.days_ago(1),
+ tags=['example'],
+) as dag_memcache:
+ # [START howto_operator_create_instance_memcached]
+ create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
+ task_id="create-instance",
+ location="europe-north1",
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ instance=MEMCACHED_INSTANCE,
+ project_id=GCP_PROJECT_ID,
+ )
+ # [END howto_operator_create_instance_memcached]
+
+ # [START howto_operator_delete_instance_memcached]
+ delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
+ task_id="delete-instance",
+ location="europe-north1",
+ instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
+ )
+ # [END howto_operator_delete_instance_memcached]
+
+ # [START howto_operator_get_instance_memcached]
+ get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
+ task_id="get-instance",
+ location="europe-north1",
+ instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
+ )
+ # [END howto_operator_get_instance_memcached]
+
+ # [START howto_operator_list_instances_memcached]
+ list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
+ task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
+ )
+ # [END howto_operator_list_instances_memcached]
+
+ # # [START howto_operator_update_instance_memcached]
+ update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
+ task_id="update-instance",
+ location="europe-north1",
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
+ update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
+ instance={"node_count": 2},
+ )
+ # [END howto_operator_update_instance_memcached]
+
+ # [START howto_operator_update_and_apply_parameters_memcached]
+ update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+ task_id="update-parameters",
+ location="europe-north1",
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
+ update_mask={"paths": ["params"]},
+ parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
+ )
+
+ apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+ task_id="apply-parameters",
+ location="europe-north1",
+ instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+ project_id=GCP_PROJECT_ID,
+ node_ids=["node-a-1"],
+ apply_all=False,
+ )
+
+ # update_parameters >> apply_parameters
+ # [END howto_operator_update_and_apply_parameters_memcached]
+
+ create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
+ create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
+ update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 995db95..a28cce1 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -17,13 +17,18 @@
# under the License.
"""Hooks for Cloud Memorystore service"""
from typing import Dict, Optional, Sequence, Tuple, Union
+import json
from google.api_core.exceptions import NotFound
+from google.api_core import path_template
from google.api_core.retry import Retry
+from google.cloud.memcache_v1beta2 import CloudMemcacheClient
+from google.cloud.memcache_v1beta2.types import cloud_memcache
from google.cloud.redis_v1 import CloudRedisClient
from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
from google.protobuf.json_format import ParseDict
+import proto
from airflow import version
from airflow.exceptions import AirflowException
@@ -496,3 +501,449 @@ class CloudMemorystoreHook(GoogleBaseHook):
)
result.result()
self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+ """
+ Hook for Google Cloud Memorystore for Memcached service APIs.
+
+ All the methods in the hook where project_id is used must be called with
+ keyword arguments rather than positional.
+
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+ if any. For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account.
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ def __init__(
+ self,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
+ self._client: Optional[CloudMemcacheClient] = None
+
+ def get_conn(
+ self,
+ ):
+ """
+ Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+ """
+ if not self._client:
+ self._client = CloudMemcacheClient(credentials=self._get_credentials())
+ return self._client
+
+ @staticmethod
+ def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+ """
+ Append labels to provided Instance type
+
+ Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+ airflow version string follows semantic versioning spec: x.y.z).
+
+ :param instance: The proto to append resource_label airflow
+ version to
+ :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+ :param key: The key label
+ :type key: str
+ :param val:
+ :type val: str
+ :return: The cluster proto updated with new label
+ """
+ val = val.replace(".", "-").replace("+", "-")
+ instance.labels.update({key: val})
+ return instance
+
+ @staticmethod
+ def proto_message_to_dict(message: proto.Message) -> dict:
+ """Helper method to parse protobuf message to dictionary."""
+ return json.loads(message.__class__.to_json(message))
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def apply_parameters(
+ self,
+ node_ids: Sequence[str],
+ apply_all: bool,
+ project_id: str,
+ location: str,
+ instance_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+ :param node_ids: Nodes to which we should apply the instance-level parameter group.
+ :type node_ids: Sequence[str]
+ :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+ will explicitly restrict users from specifying any nodes, and apply parameter group updates
+ to all nodes within the instance.
+ :type apply_all: bool
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+ name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+ self.log.info("Applying update to instance: %s", instance_id)
+ result = client.apply_parameters(
+ name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+ )
+ result.result()
+ self.log.info("Instance updated: %s", instance_id)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_instance(
+ self,
+ location: str,
+ instance_id: str,
+ instance: Union[Dict, cloud_memcache.Instance],
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Creates a Memcached instance based on the specified tier and memory size.
+
+ By default, the instance is accessible from the project's `default network
+ <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: Required. The logical name of the Memcached instance in the customer project
+ with the following restrictions:
+
+ - Must contain only lowercase letters, numbers, and hyphens.
+ - Must start with a letter.
+ - Must be between 1-40 characters.
+ - Must end with a number or a letter.
+ - Must be unique within the customer project / location
+ :type instance_id: str
+ :param instance: Required. A Memcached [Instance] resource
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+ :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+ parent = path_template.expand(
+ "projects/{project}/locations/{location}", project=project_id, location=location
+ )
+ instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+ try:
+ instance = client.get_instance(
+ name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+ )
+ self.log.info("Instance exists. Skipping creation.")
+ return instance
+ except NotFound:
+ self.log.info("Instance not exists.")
+
+ if isinstance(instance, dict):
+ instance = cloud_memcache.Instance(instance)
+ elif not isinstance(instance, cloud_memcache.Instance):
+ raise AirflowException("instance is not instance of Instance type or python dict")
+
+ self._append_label(instance, "airflow-version", "v" + version.version)
+
+ result = client.create_instance(
+ parent=parent,
+ instance_id=instance_id,
+ resource=instance,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ result.result()
+ self.log.info("Instance created.")
+ return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_instance(
+ self,
+ location: str,
+ instance: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Deletes a specific Memcached instance. Instance stops serving and data is deleted.
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance: The logical name of the Memcached instance in the customer project.
+ :type instance: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+ name = CloudMemcacheClient.instance_path(project_id, location, instance)
+ self.log.info("Fetching Instance: %s", name)
+ instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+
+ if not instance:
+ return
+
+ self.log.info("Deleting Instance: %s", name)
+ result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+ result.result()
+ self.log.info("Instance deleted: %s", name)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_instance(
+ self,
+ location: str,
+ instance: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Gets the details of a specific Memcached instance.
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance: The logical name of the Memcached instance in the customer project.
+ :type instance: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+ name = CloudMemcacheClient.instance_path(project_id, location, instance)
+ result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata or ())
+ self.log.info("Fetched Instance: %s", name)
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_instances(
+ self,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Lists all Memcached instances owned by a project in either the specified location (region) or all
+ locations.
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+
+ If it is specified as ``-`` (wildcard), then all regions available to the project are
+ queried, and the results are aggregated.
+ :type location: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+ parent = path_template.expand(
+ "projects/{project}/locations/{location}", project=project_id, location=location
+ )
+ result = client.list_instances(parent=parent, retry=retry, timeout=timeout, metadata=metadata)
+ self.log.info("Fetched instances")
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_instance(
+ self,
+ update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+ instance: Union[Dict, cloud_memcache.Instance],
+ project_id: str,
+ location: Optional[str] = None,
+ instance_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Updates the metadata and configuration of a specific Memcached instance.
+
+ :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this
+ field. The elements of the repeated paths field may only include these fields from ``Instance``:
+
+ - ``displayName``
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+ :type update_mask:
+ Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+ :param instance: Required. Update description. Only fields specified in ``update_mask`` are updated.
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+ :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+
+ if isinstance(instance, dict):
+ instance = cloud_memcache.Instance(instance)
+ elif not isinstance(instance, cloud_memcache.Instance):
+ raise AirflowException("instance is not instance of Instance type or python dict")
+
+ if location and instance_id:
+ name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+ instance.name = name
+
+ self.log.info("Updating instances: %s", instance.name)
+ result = client.update_instance(
+ update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata
+ )
+ result.result()
+ self.log.info("Instance updated: %s", instance.name)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_parameters(
+ self,
+ update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+ parameters: Union[Dict, cloud_memcache.MemcacheParameters],
+ project_id: str,
+ location: str,
+ instance_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Updates the defined Memcached Parameters for an existing Instance. This method only stages the
+ parameters, it must be followed by apply_parameters to apply the parameters to nodes of
+ the Memcached Instance.
+
+ :param update_mask: Required. Mask of fields to update.
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+ :type update_mask:
+ Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+ :param parameters: The parameters to apply to the instance.
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters`
+ :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters]
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_conn()
+ metadata = metadata or ()
+
+ if isinstance(parameters, dict):
+ parameters = cloud_memcache.MemcacheParameters(parameters)
+ elif not isinstance(parameters, cloud_memcache.MemcacheParameters):
+ raise AirflowException("instance is not instance of MemcacheParameters type or python dict")
+
+ name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+ self.log.info("Staging update to instance: %s", instance_id)
+ result = client.update_parameters(
+ name=name,
+ update_mask=update_mask,
+ parameters=parameters,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ result.result()
+ self.log.info("Update staged for instance: %s", instance_id)
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index 0600914..b4ec705 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -19,12 +19,16 @@
from typing import Dict, Optional, Sequence, Tuple, Union
from google.api_core.retry import Retry
+from google.cloud.memcache_v1beta2.types import cloud_memcache
from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
from google.protobuf.json_format import MessageToDict
from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook
+from airflow.providers.google.cloud.hooks.cloud_memorystore import (
+ CloudMemorystoreHook,
+ CloudMemorystoreMemcachedHook,
+)
from airflow.utils.decorators import apply_defaults
@@ -1108,3 +1112,613 @@ class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+
+class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
+ """
+ Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator`
+
+ :param node_ids: Nodes to which we should apply the instance-level parameter group.
+ :type node_ids: Sequence[str]
+ :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+ will explicitly restrict users from specifying any nodes, and apply parameter group updates
+ to all nodes within the instance.
+ :type apply_all: bool
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = (
+ "node_ids",
+ "apply_all",
+ "location",
+ "instance_id",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ node_ids: Sequence[str],
+ apply_all: bool,
+ location: str,
+ instance_id: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.node_ids = node_ids
+ self.apply_all = apply_all
+ self.location = location
+ self.instance_id = instance_id
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ hook.apply_parameters(
+ node_ids=self.node_ids,
+ apply_all=self.apply_all,
+ location=self.location,
+ instance_id=self.instance_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+
+class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
+ """
+ Creates a Memcached instance based on the specified tier and memory size.
+
+ By default, the instance is accessible from the project's `default network
+ <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator`
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: Required. The logical name of the Memcached instance in the customer project with the
+ following restrictions:
+
+ - Must contain only lowercase letters, numbers, and hyphens.
+ - Must start with a letter.
+ - Must be between 1-40 characters.
+ - Must end with a number or a letter.
+ - Must be unique within the customer project / location
+ :type instance_id: str
+ :param instance: Required. A Memcached [Instance] resource
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+ :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = (
+ "location",
+ "instance_id",
+ "instance",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ location: str,
+ instance_id: str,
+ instance: Union[Dict, cloud_memcache.Instance],
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.location = location
+ self.instance_id = instance_id
+ self.instance = instance
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id)
+ result = hook.create_instance(
+ location=self.location,
+ instance_id=self.instance_id,
+ instance=self.instance,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return hook.proto_message_to_dict(result)
+
+
+class CloudMemorystoreMemcachedDeleteInstanceOperator(BaseOperator):
+ """
+ Deletes a specific Memcached instance. Instance stops serving and data is deleted.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator`
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance: The logical name of the Memcached instance in the customer project.
+ :type instance: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the GCP connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ("location", "instance", "project_id", "retry", "timeout", "metadata", "gcp_conn_id")
+
+ @apply_defaults
+ def __init__(
+ self,
+ location: str,
+ instance: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.location = location
+ self.instance = instance
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id)
+ hook.delete_instance(
+ location=self.location,
+ instance=self.instance,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+
+class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
+ """
+ Gets the details of a specific Memcached instance.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedGetInstanceOperator`
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance: The logical name of the Memcached instance in the customer project.
+ :type instance: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ "location",
+ "instance",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ location: str,
+ instance: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.location = location
+ self.instance = instance
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ result = hook.get_instance(
+ location=self.location,
+ instance=self.instance,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return hook.proto_message_to_dict(result)
+
+
+class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
+ """
+ Lists all Memcached instances owned by a project in either the specified location (region) or all
+ locations.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedListInstancesOperator`
+
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ If it is specified as ``-`` (wildcard), then all regions available to the project are
+ queried, and the results are aggregated.
+ :type location: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ "location",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ result = hook.list_instances(
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ instances = [hook.proto_message_to_dict(a) for a in result]
+ return instances
+
+
+class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
+ """
+ Updates the metadata and configuration of a specific Memcached instance.
+
+ :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this field.
+ The elements of the repeated paths field may only include these fields from ``Instance``:
+
+ - ``displayName``
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMas`
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator`
+
+ :type update_mask: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+ :param instance: Required. Update description. Only fields specified in update_mask are updated.
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+ :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ "update_mask",
+ "instance",
+ "location",
+ "instance_id",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+ instance: Union[Dict, cloud_memcache.Instance],
+ location: Optional[str] = None,
+ instance_id: Optional[str] = None,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.update_mask = update_mask
+ self.instance = instance
+ self.location = location
+ self.instance_id = instance_id
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ hook.update_instance(
+ update_mask=self.update_mask,
+ instance=self.instance,
+ location=self.location,
+ instance_id=self.instance_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+
+class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
+ """
+ Updates the defined Memcached Parameters for an existing Instance. This method only stages the
+ parameters, it must be followed by apply_parameters to apply the parameters to nodes of
+ the Memcached Instance.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator`
+
+ :param update_mask: Required. Mask of fields to update.
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+ :type update_mask:
+ Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+ :param parameters: The parameters to apply to the instance.
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters`
+ :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters]
+ :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+ :type location: str
+ :param instance_id: The logical name of the Memcached instance in the customer project.
+ :type instance_id: str
+ :param project_id: Project ID of the project that contains the instance. If set
+ to None or missing, the default project_id from the Google Cloud connection is used.
+ :type project_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = (
+ "update_mask",
+ "parameters",
+ "location",
+ "instance_id",
+ "project_id",
+ "retry",
+ "timeout",
+ "metadata",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+ parameters: Union[Dict, cloud_memcache.MemcacheParameters],
+ location: str,
+ instance_id: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.update_mask = update_mask
+ self.parameters = parameters
+ self.location = location
+ self.instance_id = instance_id
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = CloudMemorystoreMemcachedHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ hook.update_parameters(
+ update_mask=self.update_mask,
+ parameters=self.parameters,
+ location=self.location,
+ instance_id=self.instance_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
diff --git a/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst
new file mode 100644
index 0000000..6483c75
--- /dev/null
+++ b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst
@@ -0,0 +1,158 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Google Cloud Memorystore Memcached Operators
+============================================
+
+The `Cloud Memorystore for Memcached <https://cloud.google.com/memorystore/docs/memcached/>`__ is a fully managed
+Memcached service for Google Cloud. Applications running on Google Cloud can achieve extreme performance by
+leveraging the highly scalable, available, secure Memcached service without the burden of managing complex
+Memcached deployments.
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:CloudMemorystoreMemcachedInstance:
+
+Instance
+^^^^^^^^
+
+Operators uses a :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` for representing instance.
+The object can be presented as a compatible dictionary also.
+
+Here is an example of instance
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :start-after: [START howto_operator_memcached_instance]
+ :end-before: [END howto_operator_memcached_instance]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator:
+
+Create instance
+^^^^^^^^^^^^^^^
+
+Create a instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedCreateInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_create_instance_memcached]
+ :end-before: [END howto_operator_create_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator:
+
+Delete instance
+^^^^^^^^^^^^^^^
+
+Delete an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedDeleteInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_delete_instance_memcached]
+ :end-before: [END howto_operator_delete_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedGetInstanceOperator:
+
+Get instance
+^^^^^^^^^^^^
+
+Get an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedGetInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_get_instance_memcached]
+ :end-before: [END howto_operator_get_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedListInstancesOperator:
+
+List instances
+^^^^^^^^^^^^^^
+
+List instances is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedListInstancesOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_list_instances_memcached]
+ :end-before: [END howto_operator_list_instances_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator:
+
+Update instance
+^^^^^^^^^^^^^^^
+
+Updating an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_update_instance_memcached]
+ :end-before: [END howto_operator_update_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedApplyParametersOperator:
+
+Update and apply parameters to an instance
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To update and apply Memcached parameters to an instance use
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateParametersOperator`
+and
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedApplyParametersOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_update_and_apply_parameters_memcached]
+ :end-before: [END howto_operator_update_and_apply_parameters_memcached]
+
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Client Library Documentation <https://googleapis.dev/python/memcache/latest/index.html>`__
+* `Product Documentation <https://cloud.google.com/memorystore/docs/memcached/>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 9b17642..58296cd 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -845,12 +845,18 @@ These integrations allow you to perform various operations within the Google Clo
- :mod:`airflow.providers.google.cloud.operators.mlengine`
-
- * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__
+ * - `Cloud Memorystore Redis <https://cloud.google.com/memorystore/>`__
- :doc:`How to use <howto/operator/google/cloud/cloud_memorystore>`
- :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore`
- :mod:`airflow.providers.google.cloud.operators.cloud_memorystore`
-
+ * - `Cloud Memorystore Memcached <https://cloud.google.com/memorystore/>`__
+ - :doc:`How to use <howto/operator/google/cloud/cloud_memorystore_memcached>`
+ - :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore`
+ - :mod:`airflow.providers.google.cloud.operators.cloud_memorystore`
+ -
+
* - `Natural Language <https://cloud.google.com/natural-language/>`__
- :doc:`How to use <howto/operator/google/cloud/natural_language>`
- :mod:`airflow.providers.google.cloud.hooks.natural_language`
diff --git a/setup.py b/setup.py
index ef3ee97..c6952b5 100644
--- a/setup.py
+++ b/setup.py
@@ -268,6 +268,7 @@ google = [
'google-cloud-kms>=1.2.1,<2.0.0',
'google-cloud-language>=1.1.1,<2.0.0',
'google-cloud-logging>=1.14.0,<2.0.0',
+ 'google-cloud-memcache>=0.2.0',
'google-cloud-monitoring>=0.34.0,<2.0.0',
'google-cloud-pubsub>=1.0.0,<2.0.0',
'google-cloud-redis>=0.3.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
index fe3a1f8..8f92e79 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
@@ -20,12 +20,16 @@ from unittest import TestCase, mock
from google.api_core.retry import Retry
from google.cloud.exceptions import NotFound
+from google.cloud.memcache_v1beta2.types import cloud_memcache
from google.cloud.redis_v1.types import Instance
from mock import PropertyMock
from airflow import version
from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook
+from airflow.providers.google.cloud.hooks.cloud_memorystore import (
+ CloudMemorystoreHook,
+ CloudMemorystoreMemcachedHook,
+)
from tests.providers.google.cloud.utils.base_gcp_mock import (
GCP_PROJECT_ID_HOOK_UNIT_TEST,
mock_base_gcp_hook_default_project_id,
@@ -42,6 +46,7 @@ TEST_TIMEOUT = 10 # type: float
TEST_METADATA = [("KEY", "VALUE")] # type: Sequence[Tuple[str, str]]
TEST_PAGE_SIZE = 100 # type: int
TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]} # type: Dict
+TEST_UPDATE_MASK_MEMCACHED = {"displayName": "updated name"} # type: Dict
TEST_PARENT = "projects/test-project-id/locations/test-location" # type: str
TEST_NAME = "projects/test-project-id/locations/test-location/instances/test-instance-id" # type: str
TEST_PARENT_DEFAULT_PROJECT_ID = "projects/{}/locations/test-location".format(
@@ -429,3 +434,184 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
+
+
+class TestCloudMemorystoreMemcachedWithDefaultProjectIdHook(TestCase):
+ def setUp(
+ self,
+ ):
+ with mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.__init__",
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.hook = CloudMemorystoreMemcachedHook(gcp_conn_id="test")
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_create_instance_when_exists(self, mock_get_conn, mock_project_id):
+ mock_get_conn.return_value.get_instance.return_value = cloud_memcache.Instance(name=TEST_NAME)
+ result = self.hook.create_instance( # pylint: disable=no-value-for-parameter
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ instance=cloud_memcache.Instance(name=TEST_NAME),
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.get_instance.assert_called_once_with(
+ name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+ )
+ self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result)
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_create_instance_when_not_exists(self, mock_get_conn, mock_project_id):
+ mock_get_conn.return_value.get_instance.side_effect = [
+ NotFound("Instance not found"),
+ cloud_memcache.Instance(name=TEST_NAME),
+ ]
+ mock_get_conn.return_value.create_instance.return_value.result.return_value = cloud_memcache.Instance(
+ name=TEST_NAME
+ )
+ result = self.hook.create_instance( # pylint: disable=no-value-for-parameter
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ instance=cloud_memcache.Instance(name=TEST_NAME),
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.get_instance.has_calls(
+ [
+ mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA),
+ mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA),
+ ]
+ )
+ mock_get_conn.return_value.create_instance.assert_called_once_with(
+ resource=cloud_memcache.Instance(
+ name=TEST_NAME,
+ labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+ ),
+ instance_id=TEST_INSTANCE_ID,
+ metadata=TEST_METADATA,
+ parent=TEST_PARENT_DEFAULT_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ )
+ self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result)
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_delete_instance(self, mock_get_conn, mock_project_id):
+ self.hook.delete_instance( # pylint: disable=no-value-for-parameter
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.delete_instance.assert_called_once_with(
+ name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+ )
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_get_instance(self, mock_get_conn, mock_project_id):
+ self.hook.get_instance( # pylint: disable=no-value-for-parameter
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.get_instance.assert_called_once_with(
+ name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+ )
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_list_instances(self, mock_get_conn, mock_project_id):
+ self.hook.list_instances( # pylint: disable=no-value-for-parameter
+ location=TEST_LOCATION,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.list_instances.assert_called_once_with(
+ parent=TEST_PARENT_DEFAULT_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+ )
+ def test_update_instance(self, mock_get_conn, mock_project_id):
+ self.hook.update_instance( # pylint: disable=no-value-for-parameter
+ update_mask=TEST_UPDATE_MASK_MEMCACHED,
+ instance=cloud_memcache.Instance(name=TEST_NAME),
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+ mock_get_conn.return_value.update_instance.assert_called_once_with(
+ update_mask=TEST_UPDATE_MASK_MEMCACHED,
+ resource=cloud_memcache.Instance(name=TEST_NAME_DEFAULT_PROJECT_ID),
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+ def test_proto_functions(self):
+ instance_dict = {
+ 'name': 'test_name',
+ 'node_count': 1,
+ 'node_config': {'cpu_count': 1, 'memory_size_mb': 1024},
+ }
+ instance = cloud_memcache.Instance(instance_dict)
+ instance_dict_result = self.hook.proto_message_to_dict(instance)
+ self.assertEqual(instance_dict_result["name"], instance_dict["name"])
+ self.assertEqual(
+ instance_dict_result["nodeConfig"]["cpuCount"], instance_dict["node_config"]["cpu_count"]
+ )
+ self.assertEqual(
+ instance_dict_result["nodeConfig"]["memorySizeMb"], instance_dict["node_config"]["memory_size_mb"]
+ )
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
index 384a572..85b17e5 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
@@ -33,6 +33,11 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
CloudMemorystoreListInstancesOperator,
CloudMemorystoreScaleInstanceOperator,
CloudMemorystoreUpdateInstanceOperator,
+ CloudMemorystoreMemcachedCreateInstanceOperator,
+ CloudMemorystoreMemcachedDeleteInstanceOperator,
+ CloudMemorystoreMemcachedGetInstanceOperator,
+ CloudMemorystoreMemcachedListInstancesOperator,
+ CloudMemorystoreMemcachedUpdateInstanceOperator,
)
TEST_GCP_CONN_ID = "test-gcp-conn-id"
@@ -52,6 +57,7 @@ TEST_DATA_PROTECTION_MODE = FailoverInstanceRequest.DataProtectionMode.LIMITED_D
TEST_INPUT_CONFIG = {"gcs_source": {"uri": "gs://test-bucket/file.rdb"}} # type: Dict
TEST_PAGE_SIZE = 100 # type: int
TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]} # TODO: Fill missing value
+TEST_UPDATE_MASK_MEMCACHED = {"displayName": "memcached instance"}
TEST_PARENT = "test-parent"
TEST_NAME = "test-name"
@@ -375,3 +381,144 @@ class TestCloudMemorystoreCreateInstanceAndImportOperatorOperator(TestCase):
),
]
)
+
+
+class TestCloudMemorystoreMemcachedCreateInstanceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+ def test_assert_valid_hook_call(self, mock_hook):
+ task = CloudMemorystoreMemcachedCreateInstanceOperator(
+ task_id=TEST_TASK_ID,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ instance=TEST_INSTANCE,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ )
+ task.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
+ mock_hook.return_value.create_instance.assert_called_once_with(
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ instance=TEST_INSTANCE,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestCloudMemorystoreMemcachedDeleteInstanceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+ def test_assert_valid_hook_call(self, mock_hook):
+ task = CloudMemorystoreMemcachedDeleteInstanceOperator(
+ task_id=TEST_TASK_ID,
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_NAME,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ )
+ task.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
+ mock_hook.return_value.delete_instance.assert_called_once_with(
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_NAME,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestCloudMemorystoreMemcachedGetInstanceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+ def test_assert_valid_hook_call(self, mock_hook):
+ task = CloudMemorystoreMemcachedGetInstanceOperator(
+ task_id=TEST_TASK_ID,
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_NAME,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ task.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.get_instance.assert_called_once_with(
+ location=TEST_LOCATION,
+ instance=TEST_INSTANCE_NAME,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestCloudMemorystoreMemcachedListInstancesOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+ def test_assert_valid_hook_call(self, mock_hook):
+ task = CloudMemorystoreMemcachedListInstancesOperator(
+ task_id=TEST_TASK_ID,
+ location=TEST_LOCATION,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ task.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.list_instances.assert_called_once_with(
+ location=TEST_LOCATION,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestCloudMemorystoreMemcachedUpdateInstanceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+ def test_assert_valid_hook_call(self, mock_hook):
+ task = CloudMemorystoreMemcachedUpdateInstanceOperator(
+ task_id=TEST_TASK_ID,
+ update_mask=TEST_UPDATE_MASK_MEMCACHED,
+ instance=TEST_INSTANCE,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ task.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.update_instance.assert_called_once_with(
+ update_mask=TEST_UPDATE_MASK_MEMCACHED,
+ instance=TEST_INSTANCE,
+ location=TEST_LOCATION,
+ instance_id=TEST_INSTANCE_ID,
+ project_id=TEST_PROJECT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
index 4b42143..6f56f18 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
@@ -45,8 +45,12 @@ class CloudMemorystoreSystemTest(GoogleSystemTest):
self.create_gcs_bucket(GCP_BUCKET_NAME, location="europe-north1")
@provide_gcp_context(GCP_MEMORYSTORE)
- def test_run_example_dag(self):
- self.run_dag('gcp_cloud_memorystore', CLOUD_DAG_FOLDER)
+ def test_run_example_dag_memorystore_redis(self):
+ self.run_dag('gcp_cloud_memorystore_redis', CLOUD_DAG_FOLDER)
+
+ @provide_gcp_context(GCP_MEMORYSTORE)
+ def test_run_example_dag_memorystore_memcached(self):
+ self.run_dag('gcp_cloud_memorystore_memcached', CLOUD_DAG_FOLDER)
@provide_gcp_context(GCP_MEMORYSTORE)
def tearDown(self):