You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/22 02:00:46 UTC

[airflow] branch master updated: Add Google Cloud Memorystore Memcached Operators (#10121)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9150330  Add Google Cloud Memorystore Memcached Operators (#10121)
9150330 is described below

commit 91503308c723b186ce6f4026f2a3e2c21030f6e5
Author: Tanjin Panna <ta...@gmail.com>
AuthorDate: Wed Oct 21 21:50:40 2020 -0400

    Add Google Cloud Memorystore Memcached Operators (#10121)
    
    Co-authored-by: Tobiasz Kędzierski <to...@polidea.com>
    Co-authored-by: Kamil Breguła <mi...@users.noreply.github.com>
---
 .../example_dags/example_cloud_memorystore.py      | 131 ++++-
 .../google/cloud/hooks/cloud_memorystore.py        | 451 +++++++++++++++
 .../google/cloud/operators/cloud_memorystore.py    | 616 ++++++++++++++++++++-
 .../google/cloud/cloud_memorystore_memcached.rst   | 158 ++++++
 docs/operators-and-hooks-ref.rst                   |   8 +-
 setup.py                                           |   1 +
 .../google/cloud/hooks/test_cloud_memorystore.py   | 188 ++++++-
 .../cloud/operators/test_cloud_memorystore.py      | 147 +++++
 .../operators/test_cloud_memorystore_system.py     |   8 +-
 9 files changed, 1687 insertions(+), 21 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
index a3d8c2b..704f02c 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+++ b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
@@ -22,6 +22,7 @@ import os
 from urllib.parse import urlparse
 
 from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
+from google.cloud.memcache_v1beta2.types import cloud_memcache
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -37,15 +38,29 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
     CloudMemorystoreListInstancesOperator,
     CloudMemorystoreScaleInstanceOperator,
     CloudMemorystoreUpdateInstanceOperator,
+    CloudMemorystoreMemcachedApplyParametersOperator,
+    CloudMemorystoreMemcachedCreateInstanceOperator,
+    CloudMemorystoreMemcachedDeleteInstanceOperator,
+    CloudMemorystoreMemcachedGetInstanceOperator,
+    CloudMemorystoreMemcachedListInstancesOperator,
+    CloudMemorystoreMemcachedUpdateInstanceOperator,
+    CloudMemorystoreMemcachedUpdateParametersOperator,
 )
 from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
 from airflow.utils import dates
 
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
 
-INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore")
-INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2")
-INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3")
+MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-")
+MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
+    "GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2"
+)
+MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
+    "GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3"
+)
+MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
+    "GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1"
+)
 
 EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb")
 EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL)
@@ -57,9 +72,13 @@ FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
 
 SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
 
+# [START howto_operator_memcached_instance]
+MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
+# [END howto_operator_memcached_instance]
+
 
 with models.DAG(
-    "gcp_cloud_memorystore",
+    "gcp_cloud_memorystore_redis",
     schedule_interval=None,  # Override to match your needs
     start_date=dates.days_ago(1),
     tags=['example'],
@@ -68,7 +87,7 @@ with models.DAG(
     create_instance = CloudMemorystoreCreateInstanceOperator(
         task_id="create-instance",
         location="europe-north1",
-        instance_id=INSTANCE_NAME,
+        instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
         instance=FIRST_INSTANCE,
         project_id=GCP_PROJECT_ID,
     )
@@ -84,7 +103,7 @@ with models.DAG(
     create_instance_2 = CloudMemorystoreCreateInstanceOperator(
         task_id="create-instance-2",
         location="europe-north1",
-        instance_id=INSTANCE_NAME_2,
+        instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         instance=SECOND_INSTANCE,
         project_id=GCP_PROJECT_ID,
     )
@@ -93,7 +112,7 @@ with models.DAG(
     get_instance = CloudMemorystoreGetInstanceOperator(
         task_id="get-instance",
         location="europe-north1",
-        instance=INSTANCE_NAME,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
         project_id=GCP_PROJECT_ID,
         do_xcom_push=True,
     )
@@ -109,7 +128,7 @@ with models.DAG(
     failover_instance = CloudMemorystoreFailoverInstanceOperator(
         task_id="failover-instance",
         location="europe-north1",
-        instance=INSTANCE_NAME_2,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS,
         project_id=GCP_PROJECT_ID,
     )
@@ -131,7 +150,7 @@ with models.DAG(
     update_instance = CloudMemorystoreUpdateInstanceOperator(
         task_id="update-instance",
         location="europe-north1",
-        instance_id=INSTANCE_NAME,
+        instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
         project_id=GCP_PROJECT_ID,
         update_mask={"paths": ["memory_size_gb"]},
         instance={"memory_size_gb": 2},
@@ -152,7 +171,7 @@ with models.DAG(
     export_instance = CloudMemorystoreExportInstanceOperator(
         task_id="export-instance",
         location="europe-north1",
-        instance=INSTANCE_NAME,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
         output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
         project_id=GCP_PROJECT_ID,
     )
@@ -162,7 +181,7 @@ with models.DAG(
     import_instance = CloudMemorystoreImportOperator(
         task_id="import-instance",
         location="europe-north1",
-        instance=INSTANCE_NAME_2,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
         project_id=GCP_PROJECT_ID,
     )
@@ -170,14 +189,17 @@ with models.DAG(
 
     # [START howto_operator_delete_instance]
     delete_instance = CloudMemorystoreDeleteInstanceOperator(
-        task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID
+        task_id="delete-instance",
+        location="europe-north1",
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
     )
     # [END howto_operator_delete_instance]
 
     delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
         task_id="delete-instance-2",
         location="europe-north1",
-        instance=INSTANCE_NAME_2,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
         project_id=GCP_PROJECT_ID,
     )
 
@@ -185,7 +207,7 @@ with models.DAG(
     create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
         task_id="create-instance-and-import",
         location="europe-north1",
-        instance_id=INSTANCE_NAME_3,
+        instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
         instance=FIRST_INSTANCE,
         input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
         project_id=GCP_PROJECT_ID,
@@ -196,7 +218,7 @@ with models.DAG(
     scale_instance = CloudMemorystoreScaleInstanceOperator(
         task_id="scale-instance",
         location="europe-north1",
-        instance_id=INSTANCE_NAME_3,
+        instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
         project_id=GCP_PROJECT_ID,
         memory_size_gb=3,
     )
@@ -206,7 +228,7 @@ with models.DAG(
     export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
         task_id="export-and-delete-instance",
         location="europe-north1",
-        instance=INSTANCE_NAME_3,
+        instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
         output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
         project_id=GCP_PROJECT_ID,
     )
@@ -229,3 +251,80 @@ with models.DAG(
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+with models.DAG(
+    "gcp_cloud_memorystore_memcached",
+    schedule_interval=None,  # Override to match your needs
+    start_date=dates.days_ago(1),
+    tags=['example'],
+) as dag_memcache:
+    # [START howto_operator_create_instance_memcached]
+    create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance",
+        location="europe-north1",
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance",
+        location="europe-north1",
+        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance",
+        location="europe-north1",
+        instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # # [START howto_operator_update_instance_memcached]
+    update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance",
+        location="europe-north1",
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
+        instance={"node_count": 2},
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location="europe-north1",
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask={"paths": ["params"]},
+        parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
+    )
+
+    apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+        task_id="apply-parameters",
+        location="europe-north1",
+        instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        node_ids=["node-a-1"],
+        apply_all=False,
+    )
+
+    # update_parameters >> apply_parameters
+    # [END howto_operator_update_and_apply_parameters_memcached]
+
+    create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
+    create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
+    update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index 995db95..a28cce1 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -17,13 +17,18 @@
 # under the License.
 """Hooks for Cloud Memorystore service"""
 from typing import Dict, Optional, Sequence, Tuple, Union
+import json
 
 from google.api_core.exceptions import NotFound
+from google.api_core import path_template
 from google.api_core.retry import Retry
+from google.cloud.memcache_v1beta2 import CloudMemcacheClient
+from google.cloud.memcache_v1beta2.types import cloud_memcache
 from google.cloud.redis_v1 import CloudRedisClient
 from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
 from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
 from google.protobuf.json_format import ParseDict
+import proto
 
 from airflow import version
 from airflow.exceptions import AirflowException
@@ -496,3 +501,449 @@ class CloudMemorystoreHook(GoogleBaseHook):
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(
+        self,
+    ):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @staticmethod
+    def proto_message_to_dict(message: proto.Message) -> dict:
+        """Helper method to parse protobuf message to dictionary."""
+        return json.loads(message.__class__.to_json(message))
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory size.
+
+        By default, the instance is accessible from the project's `default network
+        <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = cloud_memcache.Instance(instance)
+        elif not isinstance(instance, cloud_memcache.Instance):
+            raise AirflowException("instance is not instance of Instance type or python dict")
+
+        self._append_label(instance, "airflow-version", "v" + version.version)
+
+        result = client.create_instance(
+            parent=parent,
+            instance_id=instance_id,
+            resource=instance,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        result.result()
+        self.log.info("Instance created.")
+        return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Deletes a specific Memcached instance.  Instance stops serving and data is deleted.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        self.log.info("Fetching Instance: %s", name)
+        instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+
+        if not instance:
+            return
+
+        self.log.info("Deleting Instance: %s", name)
+        result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        result.result()
+        self.log.info("Instance deleted: %s", name)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Gets the details of a specific Memcached instance.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata or ())
+        self.log.info("Fetched Instance: %s", name)
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_instances(
+        self,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Lists all Memcached instances owned by a project in either the specified location (region) or all
+        locations.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+
+                If it is specified as ``-`` (wildcard), then all regions available to the project are
+                queried, and the results are aggregated.
+        :type location: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        result = client.list_instances(parent=parent, retry=retry, timeout=timeout, metadata=metadata)
+        self.log.info("Fetched instances")
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_instance(
+        self,
+        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        location: Optional[str] = None,
+        instance_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Updates the metadata and configuration of a specific Memcached instance.
+
+        :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this
+            field. The elements of the repeated paths field may only include these fields from ``Instance``:
+
+            -  ``displayName``
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+        :type update_mask:
+            Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+        :param instance: Required. Update description. Only fields specified in ``update_mask`` are updated.
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+
+        if isinstance(instance, dict):
+            instance = cloud_memcache.Instance(instance)
+        elif not isinstance(instance, cloud_memcache.Instance):
+            raise AirflowException("instance is not instance of Instance type or python dict")
+
+        if location and instance_id:
+            name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+            instance.name = name
+
+        self.log.info("Updating instances: %s", instance.name)
+        result = client.update_instance(
+            update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance.name)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_parameters(
+        self,
+        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        parameters: Union[Dict, cloud_memcache.MemcacheParameters],
+        project_id: str,
+        location: str,
+        instance_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Updates the defined Memcached Parameters for an existing Instance. This method only stages the
+            parameters, it must be followed by apply_parameters to apply the parameters to nodes of
+            the Memcached Instance.
+
+        :param update_mask: Required. Mask of fields to update.
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+        :type update_mask:
+            Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+        :param parameters: The parameters to apply to the instance.
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters`
+        :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters]
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        metadata = metadata or ()
+
+        if isinstance(parameters, dict):
+            parameters = cloud_memcache.MemcacheParameters(parameters)
+        elif not isinstance(parameters, cloud_memcache.MemcacheParameters):
+            raise AirflowException("instance is not instance of MemcacheParameters type or python dict")
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        self.log.info("Staging update to instance: %s", instance_id)
+        result = client.update_parameters(
+            name=name,
+            update_mask=update_mask,
+            parameters=parameters,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        result.result()
+        self.log.info("Update staged for instance: %s", instance_id)
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index 0600914..b4ec705 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -19,12 +19,16 @@
 from typing import Dict, Optional, Sequence, Tuple, Union
 
 from google.api_core.retry import Retry
+from google.cloud.memcache_v1beta2.types import cloud_memcache
 from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
 from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
 from google.protobuf.json_format import MessageToDict
 
 from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook
+from airflow.providers.google.cloud.hooks.cloud_memorystore import (
+    CloudMemorystoreHook,
+    CloudMemorystoreMemcachedHook,
+)
 from airflow.utils.decorators import apply_defaults
 
 
@@ -1108,3 +1112,613 @@ class CloudMemorystoreExportAndDeleteInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+
+class CloudMemorystoreMemcachedApplyParametersOperator(BaseOperator):
+    """
+    Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator`
+
+    :param node_ids: Nodes to which we should apply the instance-level parameter group.
+    :type node_ids: Sequence[str]
+    :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+        will explicitly restrict users from specifying any nodes, and apply parameter group updates
+        to all nodes within the instance.
+    :type apply_all: bool
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance_id: The logical name of the Memcached instance in the customer project.
+    :type instance_id: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = (
+        "node_ids",
+        "apply_all",
+        "location",
+        "instance_id",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        location: str,
+        instance_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.node_ids = node_ids
+        self.apply_all = apply_all
+        self.location = location
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        hook.apply_parameters(
+            node_ids=self.node_ids,
+            apply_all=self.apply_all,
+            location=self.location,
+            instance_id=self.instance_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+
+class CloudMemorystoreMemcachedCreateInstanceOperator(BaseOperator):
+    """
+    Creates a Memcached instance based on the specified tier and memory size.
+
+    By default, the instance is accessible from the project's `default network
+    <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator`
+
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance_id: Required. The logical name of the Memcached instance in the customer project with the
+        following restrictions:
+
+        -  Must contain only lowercase letters, numbers, and hyphens.
+        -  Must start with a letter.
+        -  Must be between 1-40 characters.
+        -  Must end with a number or a letter.
+        -  Must be unique within the customer project / location
+    :type instance_id: str
+    :param instance: Required. A Memcached [Instance] resource
+
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+    :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = (
+        "location",
+        "instance_id",
+        "instance",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.location = location
+        self.instance_id = instance_id
+        self.instance = instance
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id)
+        result = hook.create_instance(
+            location=self.location,
+            instance_id=self.instance_id,
+            instance=self.instance,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return hook.proto_message_to_dict(result)
+
+
+class CloudMemorystoreMemcachedDeleteInstanceOperator(BaseOperator):
+    """
+    Deletes a specific Memcached instance. Instance stops serving and data is deleted.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator`
+
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance: The logical name of the Memcached instance in the customer project.
+    :type instance: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the GCP connection is used.
+    :type project_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ("location", "instance", "project_id", "retry", "timeout", "metadata", "gcp_conn_id")
+
+    @apply_defaults
+    def __init__(
+        self,
+        location: str,
+        instance: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.location = location
+        self.instance = instance
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(gcp_conn_id=self.gcp_conn_id)
+        hook.delete_instance(
+            location=self.location,
+            instance=self.instance,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+
+class CloudMemorystoreMemcachedGetInstanceOperator(BaseOperator):
+    """
+    Gets the details of a specific Memcached instance.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedGetInstanceOperator`
+
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance: The logical name of the Memcached instance in the customer project.
+    :type instance: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the Google Cloud connection is used.
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        "location",
+        "instance",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        location: str,
+        instance: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.instance = instance
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        result = hook.get_instance(
+            location=self.location,
+            instance=self.instance,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return hook.proto_message_to_dict(result)
+
+
+class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
+    """
+    Lists all Memcached instances owned by a project in either the specified location (region) or all
+        locations.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedListInstancesOperator`
+
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        If it is specified as ``-`` (wildcard), then all regions available to the project are
+        queried, and the results are aggregated.
+    :type location: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the Google Cloud connection is used.
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        "location",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        result = hook.list_instances(
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        instances = [hook.proto_message_to_dict(a) for a in result]
+        return instances
+
+
+class CloudMemorystoreMemcachedUpdateInstanceOperator(BaseOperator):
+    """
+    Updates the metadata and configuration of a specific Memcached instance.
+
+    :param update_mask: Required. Mask of fields to update. At least one path must be supplied in this field.
+        The elements of the repeated paths field may only include these fields from ``Instance``:
+
+        -  ``displayName``
+
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMas`
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator`
+
+    :type update_mask: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+    :param instance: Required. Update description. Only fields specified in update_mask are updated.
+
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+    :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance_id: The logical name of the Memcached instance in the customer project.
+    :type instance_id: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        "update_mask",
+        "instance",
+        "location",
+        "instance_id",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        instance: Union[Dict, cloud_memcache.Instance],
+        location: Optional[str] = None,
+        instance_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.update_mask = update_mask
+        self.instance = instance
+        self.location = location
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        hook.update_instance(
+            update_mask=self.update_mask,
+            instance=self.instance,
+            location=self.location,
+            instance_id=self.instance_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+
+class CloudMemorystoreMemcachedUpdateParametersOperator(BaseOperator):
+    """
+    Updates the defined Memcached Parameters for an existing Instance. This method only stages the
+        parameters, it must be followed by apply_parameters to apply the parameters to nodes of
+        the Memcached Instance.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudMemorystoreMemcachedApplyParametersOperator`
+
+    :param update_mask: Required. Mask of fields to update.
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+    :type update_mask:
+        Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+    :param parameters: The parameters to apply to the instance.
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters`
+    :type parameters: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters]
+    :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+    :type location: str
+    :param instance_id: The logical name of the Memcached instance in the customer project.
+    :type instance_id: str
+    :param project_id: Project ID of the project that contains the instance. If set
+        to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = (
+        "update_mask",
+        "parameters",
+        "location",
+        "instance_id",
+        "project_id",
+        "retry",
+        "timeout",
+        "metadata",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        parameters: Union[Dict, cloud_memcache.MemcacheParameters],
+        location: str,
+        instance_id: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.update_mask = update_mask
+        self.parameters = parameters
+        self.location = location
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = CloudMemorystoreMemcachedHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        hook.update_parameters(
+            update_mask=self.update_mask,
+            parameters=self.parameters,
+            location=self.location,
+            instance_id=self.instance_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
diff --git a/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst
new file mode 100644
index 0000000..6483c75
--- /dev/null
+++ b/docs/howto/operator/google/cloud/cloud_memorystore_memcached.rst
@@ -0,0 +1,158 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Google Cloud Memorystore Memcached Operators
+============================================
+
+The `Cloud Memorystore for Memcached <https://cloud.google.com/memorystore/docs/memcached/>`__ is a fully managed
+Memcached service for Google Cloud. Applications running on Google Cloud can achieve extreme performance by
+leveraging the highly scalable, available, secure Memcached service without the burden of managing complex
+Memcached deployments.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:CloudMemorystoreMemcachedInstance:
+
+Instance
+^^^^^^^^
+
+Operators uses a :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance` for representing instance.
+The object can be presented as a compatible dictionary also.
+
+Here is an example of instance
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :start-after: [START howto_operator_memcached_instance]
+    :end-before: [END howto_operator_memcached_instance]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedCreateInstanceOperator:
+
+Create instance
+^^^^^^^^^^^^^^^
+
+Create a instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedCreateInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_instance_memcached]
+    :end-before: [END howto_operator_create_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedDeleteInstanceOperator:
+
+Delete instance
+^^^^^^^^^^^^^^^
+
+Delete an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedDeleteInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_delete_instance_memcached]
+    :end-before: [END howto_operator_delete_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedGetInstanceOperator:
+
+Get instance
+^^^^^^^^^^^^
+
+Get an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedGetInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_get_instance_memcached]
+    :end-before: [END howto_operator_get_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedListInstancesOperator:
+
+List instances
+^^^^^^^^^^^^^^
+
+List instances is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedListInstancesOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_list_instances_memcached]
+    :end-before: [END howto_operator_list_instances_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedUpdateInstanceOperator:
+
+Update instance
+^^^^^^^^^^^^^^^
+
+Updating an instance is performed with the
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateInstanceOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_update_instance_memcached]
+    :end-before: [END howto_operator_update_instance_memcached]
+
+
+.. _howto/operator:CloudMemorystoreMemcachedApplyParametersOperator:
+
+Update and apply parameters to an instance
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To update and apply Memcached parameters to an instance use
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedUpdateParametersOperator`
+and
+:class:`~airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedApplyParametersOperator`
+operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_update_and_apply_parameters_memcached]
+    :end-before: [END howto_operator_update_and_apply_parameters_memcached]
+
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Client Library Documentation <https://googleapis.dev/python/memcache/latest/index.html>`__
+* `Product Documentation <https://cloud.google.com/memorystore/docs/memcached/>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 9b17642..58296cd 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -845,12 +845,18 @@ These integrations allow you to perform various operations within the Google Clo
      - :mod:`airflow.providers.google.cloud.operators.mlengine`
      -
 
-   * - `Cloud Memorystore <https://cloud.google.com/memorystore/>`__
+   * - `Cloud Memorystore Redis <https://cloud.google.com/memorystore/>`__
      - :doc:`How to use <howto/operator/google/cloud/cloud_memorystore>`
      - :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore`
      - :mod:`airflow.providers.google.cloud.operators.cloud_memorystore`
      -
 
+   * - `Cloud Memorystore Memcached <https://cloud.google.com/memorystore/>`__
+     - :doc:`How to use <howto/operator/google/cloud/cloud_memorystore_memcached>`
+     - :mod:`airflow.providers.google.cloud.hooks.cloud_memorystore`
+     - :mod:`airflow.providers.google.cloud.operators.cloud_memorystore`
+     -
+
    * - `Natural Language <https://cloud.google.com/natural-language/>`__
      - :doc:`How to use <howto/operator/google/cloud/natural_language>`
      - :mod:`airflow.providers.google.cloud.hooks.natural_language`
diff --git a/setup.py b/setup.py
index ef3ee97..c6952b5 100644
--- a/setup.py
+++ b/setup.py
@@ -268,6 +268,7 @@ google = [
     'google-cloud-kms>=1.2.1,<2.0.0',
     'google-cloud-language>=1.1.1,<2.0.0',
     'google-cloud-logging>=1.14.0,<2.0.0',
+    'google-cloud-memcache>=0.2.0',
     'google-cloud-monitoring>=0.34.0,<2.0.0',
     'google-cloud-pubsub>=1.0.0,<2.0.0',
     'google-cloud-redis>=0.3.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
index fe3a1f8..8f92e79 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
@@ -20,12 +20,16 @@ from unittest import TestCase, mock
 
 from google.api_core.retry import Retry
 from google.cloud.exceptions import NotFound
+from google.cloud.memcache_v1beta2.types import cloud_memcache
 from google.cloud.redis_v1.types import Instance
 from mock import PropertyMock
 
 from airflow import version
 from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.cloud_memorystore import CloudMemorystoreHook
+from airflow.providers.google.cloud.hooks.cloud_memorystore import (
+    CloudMemorystoreHook,
+    CloudMemorystoreMemcachedHook,
+)
 from tests.providers.google.cloud.utils.base_gcp_mock import (
     GCP_PROJECT_ID_HOOK_UNIT_TEST,
     mock_base_gcp_hook_default_project_id,
@@ -42,6 +46,7 @@ TEST_TIMEOUT = 10  # type: float
 TEST_METADATA = [("KEY", "VALUE")]  # type:  Sequence[Tuple[str, str]]
 TEST_PAGE_SIZE = 100  # type: int
 TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]}  # type: Dict
+TEST_UPDATE_MASK_MEMCACHED = {"displayName": "updated name"}  # type: Dict
 TEST_PARENT = "projects/test-project-id/locations/test-location"  # type: str
 TEST_NAME = "projects/test-project-id/locations/test-location/instances/test-instance-id"  # type: str
 TEST_PARENT_DEFAULT_PROJECT_ID = "projects/{}/locations/test-location".format(
@@ -429,3 +434,184 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
                 timeout=TEST_TIMEOUT,
                 metadata=TEST_METADATA,
             )
+
+
+class TestCloudMemorystoreMemcachedWithDefaultProjectIdHook(TestCase):
+    def setUp(
+        self,
+    ):
+        with mock.patch(
+            "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.__init__",
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = CloudMemorystoreMemcachedHook(gcp_conn_id="test")
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_create_instance_when_exists(self, mock_get_conn, mock_project_id):
+        mock_get_conn.return_value.get_instance.return_value = cloud_memcache.Instance(name=TEST_NAME)
+        result = self.hook.create_instance(  # pylint: disable=no-value-for-parameter
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            instance=cloud_memcache.Instance(name=TEST_NAME),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.get_instance.assert_called_once_with(
+            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+        )
+        self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result)
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_create_instance_when_not_exists(self, mock_get_conn, mock_project_id):
+        mock_get_conn.return_value.get_instance.side_effect = [
+            NotFound("Instance not found"),
+            cloud_memcache.Instance(name=TEST_NAME),
+        ]
+        mock_get_conn.return_value.create_instance.return_value.result.return_value = cloud_memcache.Instance(
+            name=TEST_NAME
+        )
+        result = self.hook.create_instance(  # pylint: disable=no-value-for-parameter
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            instance=cloud_memcache.Instance(name=TEST_NAME),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.get_instance.has_calls(
+            [
+                mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA),
+                mock.call(name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA),
+            ]
+        )
+        mock_get_conn.return_value.create_instance.assert_called_once_with(
+            resource=cloud_memcache.Instance(
+                name=TEST_NAME,
+                labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+            ),
+            instance_id=TEST_INSTANCE_ID,
+            metadata=TEST_METADATA,
+            parent=TEST_PARENT_DEFAULT_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+        )
+        self.assertEqual(cloud_memcache.Instance(name=TEST_NAME), result)
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_delete_instance(self, mock_get_conn, mock_project_id):
+        self.hook.delete_instance(  # pylint: disable=no-value-for-parameter
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.delete_instance.assert_called_once_with(
+            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+        )
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_get_instance(self, mock_get_conn, mock_project_id):
+        self.hook.get_instance(  # pylint: disable=no-value-for-parameter
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.get_instance.assert_called_once_with(
+            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+        )
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_list_instances(self, mock_get_conn, mock_project_id):
+        self.hook.list_instances(  # pylint: disable=no-value-for-parameter
+            location=TEST_LOCATION,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.list_instances.assert_called_once_with(
+            parent=TEST_PARENT_DEFAULT_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.hooks.cloud_memorystore.CloudMemorystoreMemcachedHook.get_conn"
+    )
+    def test_update_instance(self, mock_get_conn, mock_project_id):
+        self.hook.update_instance(  # pylint: disable=no-value-for-parameter
+            update_mask=TEST_UPDATE_MASK_MEMCACHED,
+            instance=cloud_memcache.Instance(name=TEST_NAME),
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+        mock_get_conn.return_value.update_instance.assert_called_once_with(
+            update_mask=TEST_UPDATE_MASK_MEMCACHED,
+            resource=cloud_memcache.Instance(name=TEST_NAME_DEFAULT_PROJECT_ID),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+    def test_proto_functions(self):
+        instance_dict = {
+            'name': 'test_name',
+            'node_count': 1,
+            'node_config': {'cpu_count': 1, 'memory_size_mb': 1024},
+        }
+        instance = cloud_memcache.Instance(instance_dict)
+        instance_dict_result = self.hook.proto_message_to_dict(instance)
+        self.assertEqual(instance_dict_result["name"], instance_dict["name"])
+        self.assertEqual(
+            instance_dict_result["nodeConfig"]["cpuCount"], instance_dict["node_config"]["cpu_count"]
+        )
+        self.assertEqual(
+            instance_dict_result["nodeConfig"]["memorySizeMb"], instance_dict["node_config"]["memory_size_mb"]
+        )
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
index 384a572..85b17e5 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
@@ -33,6 +33,11 @@ from airflow.providers.google.cloud.operators.cloud_memorystore import (
     CloudMemorystoreListInstancesOperator,
     CloudMemorystoreScaleInstanceOperator,
     CloudMemorystoreUpdateInstanceOperator,
+    CloudMemorystoreMemcachedCreateInstanceOperator,
+    CloudMemorystoreMemcachedDeleteInstanceOperator,
+    CloudMemorystoreMemcachedGetInstanceOperator,
+    CloudMemorystoreMemcachedListInstancesOperator,
+    CloudMemorystoreMemcachedUpdateInstanceOperator,
 )
 
 TEST_GCP_CONN_ID = "test-gcp-conn-id"
@@ -52,6 +57,7 @@ TEST_DATA_PROTECTION_MODE = FailoverInstanceRequest.DataProtectionMode.LIMITED_D
 TEST_INPUT_CONFIG = {"gcs_source": {"uri": "gs://test-bucket/file.rdb"}}  # type: Dict
 TEST_PAGE_SIZE = 100  # type: int
 TEST_UPDATE_MASK = {"paths": ["memory_size_gb"]}  # TODO: Fill missing value
+TEST_UPDATE_MASK_MEMCACHED = {"displayName": "memcached instance"}
 TEST_PARENT = "test-parent"
 TEST_NAME = "test-name"
 
@@ -375,3 +381,144 @@ class TestCloudMemorystoreCreateInstanceAndImportOperatorOperator(TestCase):
                 ),
             ]
         )
+
+
+class TestCloudMemorystoreMemcachedCreateInstanceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+    def test_assert_valid_hook_call(self, mock_hook):
+        task = CloudMemorystoreMemcachedCreateInstanceOperator(
+            task_id=TEST_TASK_ID,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            instance=TEST_INSTANCE,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+        task.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
+        mock_hook.return_value.create_instance.assert_called_once_with(
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            instance=TEST_INSTANCE,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestCloudMemorystoreMemcachedDeleteInstanceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+    def test_assert_valid_hook_call(self, mock_hook):
+        task = CloudMemorystoreMemcachedDeleteInstanceOperator(
+            task_id=TEST_TASK_ID,
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_NAME,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+        )
+        task.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
+        mock_hook.return_value.delete_instance.assert_called_once_with(
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_NAME,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestCloudMemorystoreMemcachedGetInstanceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+    def test_assert_valid_hook_call(self, mock_hook):
+        task = CloudMemorystoreMemcachedGetInstanceOperator(
+            task_id=TEST_TASK_ID,
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_NAME,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        task.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.get_instance.assert_called_once_with(
+            location=TEST_LOCATION,
+            instance=TEST_INSTANCE_NAME,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestCloudMemorystoreMemcachedListInstancesOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+    def test_assert_valid_hook_call(self, mock_hook):
+        task = CloudMemorystoreMemcachedListInstancesOperator(
+            task_id=TEST_TASK_ID,
+            location=TEST_LOCATION,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        task.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.list_instances.assert_called_once_with(
+            location=TEST_LOCATION,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestCloudMemorystoreMemcachedUpdateInstanceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreMemcachedHook")
+    def test_assert_valid_hook_call(self, mock_hook):
+        task = CloudMemorystoreMemcachedUpdateInstanceOperator(
+            task_id=TEST_TASK_ID,
+            update_mask=TEST_UPDATE_MASK_MEMCACHED,
+            instance=TEST_INSTANCE,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        task.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.update_instance.assert_called_once_with(
+            update_mask=TEST_UPDATE_MASK_MEMCACHED,
+            instance=TEST_INSTANCE,
+            location=TEST_LOCATION,
+            instance_id=TEST_INSTANCE_ID,
+            project_id=TEST_PROJECT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
index 4b42143..6f56f18 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore_system.py
@@ -45,8 +45,12 @@ class CloudMemorystoreSystemTest(GoogleSystemTest):
         self.create_gcs_bucket(GCP_BUCKET_NAME, location="europe-north1")
 
     @provide_gcp_context(GCP_MEMORYSTORE)
-    def test_run_example_dag(self):
-        self.run_dag('gcp_cloud_memorystore', CLOUD_DAG_FOLDER)
+    def test_run_example_dag_memorystore_redis(self):
+        self.run_dag('gcp_cloud_memorystore_redis', CLOUD_DAG_FOLDER)
+
+    @provide_gcp_context(GCP_MEMORYSTORE)
+    def test_run_example_dag_memorystore_memcached(self):
+        self.run_dag('gcp_cloud_memorystore_memcached', CLOUD_DAG_FOLDER)
 
     @provide_gcp_context(GCP_MEMORYSTORE)
     def tearDown(self):