You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:32:46 UTC

[airflow] 22/41: Support google-cloud-redis>=2.0.0 (#13117)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ce6c631ec0e4b283f949574e8e00478ec7450b8a
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Tue Dec 22 16:25:04 2020 +0100

    Support google-cloud-redis>=2.0.0 (#13117)
    
    (cherry picked from commit 0b626c8042b304a52d6c481fa6eb689d655f33d3)
---
 airflow/providers/google/ADDITIONAL_INFO.md        |  64 +++++++++
 .../example_dags/example_cloud_memorystore.py      |   4 +-
 .../google/cloud/hooks/cloud_memorystore.py        | 144 ++++++++++++++-------
 .../google/cloud/operators/cloud_memorystore.py    |  11 +-
 setup.py                                           |   2 +-
 .../google/cloud/hooks/test_cloud_memorystore.py   |  57 ++++----
 .../cloud/operators/test_cloud_memorystore.py      |   4 +-
 7 files changed, 208 insertions(+), 78 deletions(-)

diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
new file mode 100644
index 0000000..b54b240
--- /dev/null
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -0,0 +1,64 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Migration Guide
+
+## 2.0.0
+
+### Update ``google-cloud-*`` libraries
+
+This release of the provider package contains third-party library updates, which may require updating your DAG files or custom hooks and operators, if you were using objects from those libraries. Updating of these libraries is necessary to be able to use new features made available by new versions of the libraries and to obtain bug fixes that are only available for new versions of the library.
+
+Details are covered in the UPDATING.md files for each library, but there are some details that you should pay attention to.
+
+| Library name | Previous constraints | Current constraints | |
+| --- | --- | --- | --- |
+| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=1.0.0,<2.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
+| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
+| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
+| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
+
+
+### The field names use the snake_case convention
+
+If your DAG uses an object from the above mentioned libraries passed by XCom, it is necessary to update the naming convention of the fields that are read. Previously, the fields used the CamelSnake convention, now the snake_case convention is used.
+
+**Before:**
+
+```python
+set_acl_permission = GCSBucketCreateAclEntryOperator(
+    task_id="gcs-set-acl-permission",
+    bucket=BUCKET_NAME,
+    entity="user-{{ task_instance.xcom_pull('get-instance')['persistenceIamIdentity']"
+    ".split(':', 2)[1] }}",
+    role="OWNER",
+)
+```
+
+**After:**
+
+```python
+set_acl_permission = GCSBucketCreateAclEntryOperator(
+    task_id="gcs-set-acl-permission",
+    bucket=BUCKET_NAME,
+    entity="user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"
+    ".split(':', 2)[1] }}",
+    role="OWNER",
+)
+```
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
index 441c165..acb50b4 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
+++ b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
@@ -22,7 +22,7 @@ import os
 from urllib.parse import urlparse
 
 from google.cloud.memcache_v1beta2.types import cloud_memcache
-from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
+from google.cloud.redis_v1 import FailoverInstanceRequest, Instance
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -161,7 +161,7 @@ with models.DAG(
     set_acl_permission = GCSBucketCreateAclEntryOperator(
         task_id="gcs-set-acl-permission",
         bucket=BUCKET_NAME,
-        entity="user-{{ task_instance.xcom_pull('get-instance')['persistenceIamIdentity']"
+        entity="user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"
         ".split(':', 2)[1] }}",
         role="OWNER",
     )
diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index bfc01f9..caf1cd6 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -23,10 +23,14 @@ from google.api_core.exceptions import NotFound
 from google.api_core.retry import Retry
 from google.cloud.memcache_v1beta2 import CloudMemcacheClient
 from google.cloud.memcache_v1beta2.types import cloud_memcache
-from google.cloud.redis_v1 import CloudRedisClient
-from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
-from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
-from google.protobuf.json_format import ParseDict
+from google.cloud.redis_v1 import (
+    CloudRedisClient,
+    FailoverInstanceRequest,
+    InputConfig,
+    Instance,
+    OutputConfig,
+)
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow import version
 from airflow.exceptions import AirflowException
@@ -70,7 +74,7 @@ class CloudMemorystoreHook(GoogleBaseHook):
         )
         self._client: Optional[CloudRedisClient] = None
 
-    def get_conn(self):
+    def get_conn(self) -> CloudRedisClient:
         """Retrieves client library object that allow access to Cloud Memorystore service."""
         if not self._client:
             self._client = CloudRedisClient(credentials=self._get_credentials())
@@ -143,35 +147,36 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        parent = CloudRedisClient.location_path(project_id, location)
-        instance_name = CloudRedisClient.instance_path(project_id, location, instance_id)
+        if isinstance(instance, dict):
+            instance = Instance(**instance)
+        elif not isinstance(instance, Instance):
+            raise AirflowException("instance is not instance of Instance type or python dict")
+
+        parent = f"projects/{project_id}/locations/{location}"
+        instance_name = f"projects/{project_id}/locations/{location}/instances/{instance_id}"
         try:
+            self.log.info("Fetching instance: %s", instance_name)
             instance = client.get_instance(
-                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+                request={'name': instance_name}, retry=retry, timeout=timeout, metadata=metadata or ()
             )
             self.log.info("Instance exists. Skipping creation.")
             return instance
         except NotFound:
             self.log.info("Instance not exists.")
 
-        if isinstance(instance, dict):
-            instance = ParseDict(instance, Instance())
-        elif not isinstance(instance, Instance):
-            raise AirflowException("instance is not instance of Instance type or python dict")
-
         self._append_label(instance, "airflow-version", "v" + version.version)
 
         result = client.create_instance(
-            parent=parent,
-            instance_id=instance_id,
-            instance=instance,
+            request={'parent': parent, 'instance_id': instance_id, 'instance': instance},
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance created.")
-        return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_instance(
+            request={'name': instance_name}, retry=retry, timeout=timeout, metadata=metadata or ()
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_instance(
@@ -203,15 +208,25 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        name = CloudRedisClient.instance_path(project_id, location, instance)
+        name = f"projects/{project_id}/locations/{location}/instances/{instance}"
         self.log.info("Fetching Instance: %s", name)
-        instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        instance = client.get_instance(
+            request={'name': name},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
 
         if not instance:
             return
 
         self.log.info("Deleting Instance: %s", name)
-        result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        result = client.delete_instance(
+            request={'name': name},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
         result.result()
         self.log.info("Instance deleted: %s", name)
 
@@ -253,10 +268,13 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        name = CloudRedisClient.instance_path(project_id, location, instance)
+        name = f"projects/{project_id}/locations/{location}/instances/{instance}"
         self.log.info("Exporting Instance: %s", name)
         result = client.export_instance(
-            name=name, output_config=output_config, retry=retry, timeout=timeout, metadata=metadata
+            request={'name': name, 'output_config': output_config},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance exported: %s", name)
@@ -297,15 +315,14 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        name = CloudRedisClient.instance_path(project_id, location, instance)
+        name = f"projects/{project_id}/locations/{location}/instances/{instance}"
         self.log.info("Failovering Instance: %s", name)
 
         result = client.failover_instance(
-            name=name,
-            data_protection_mode=data_protection_mode,
+            request={'name': name, 'data_protection_mode': data_protection_mode},
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance failovered: %s", name)
@@ -340,8 +357,13 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        name = CloudRedisClient.instance_path(project_id, location, instance)
-        result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        name = f"projects/{project_id}/locations/{location}/instances/{instance}"
+        result = client.get_instance(
+            request={'name': name},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
         self.log.info("Fetched Instance: %s", name)
         return result
 
@@ -384,10 +406,13 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        name = CloudRedisClient.instance_path(project_id, location, instance)
+        name = f"projects/{project_id}/locations/{location}/instances/{instance}"
         self.log.info("Importing Instance: %s", name)
         result = client.import_instance(
-            name=name, input_config=input_config, retry=retry, timeout=timeout, metadata=metadata
+            request={'name': name, 'input_config': input_config},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance imported: %s", name)
@@ -428,9 +453,12 @@ class CloudMemorystoreHook(GoogleBaseHook):
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_conn()
-        parent = CloudRedisClient.location_path(project_id, location)
+        parent = f"projects/{project_id}/locations/{location}"
         result = client.list_instances(
-            parent=parent, page_size=page_size, retry=retry, timeout=timeout, metadata=metadata
+            request={'parent': parent, 'page_size': page_size},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
         )
         self.log.info("Fetched instances")
         return result
@@ -485,17 +513,20 @@ class CloudMemorystoreHook(GoogleBaseHook):
         client = self.get_conn()
 
         if isinstance(instance, dict):
-            instance = ParseDict(instance, Instance())
+            instance = Instance(**instance)
         elif not isinstance(instance, Instance):
             raise AirflowException("instance is not instance of Instance type or python dict")
 
         if location and instance_id:
-            name = CloudRedisClient.instance_path(project_id, location, instance_id)
+            name = f"projects/{project_id}/locations/{location}/instances/{instance_id}"
             instance.name = name
 
         self.log.info("Updating instances: %s", instance.name)
         result = client.update_instance(
-            update_mask=update_mask, instance=instance, retry=retry, timeout=timeout, metadata=metadata
+            request={'update_mask': update_mask, 'instance': instance},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
@@ -610,7 +641,12 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
 
         self.log.info("Applying update to instance: %s", instance_id)
         result = client.apply_parameters(
-            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+            name=name,
+            node_ids=node_ids,
+            apply_all=apply_all,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance updated: %s", instance_id)
@@ -688,11 +724,16 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
             resource=instance,
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Instance created.")
-        return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_instance(
+            name=instance_name,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_instance(
@@ -727,13 +768,23 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
         metadata = metadata or ()
         name = CloudMemcacheClient.instance_path(project_id, location, instance)
         self.log.info("Fetching Instance: %s", name)
-        instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        instance = client.get_instance(
+            name=name,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
 
         if not instance:
             return
 
         self.log.info("Deleting Instance: %s", name)
-        result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        result = client.delete_instance(
+            name=name,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
         result.result()
         self.log.info("Instance deleted: %s", name)
 
@@ -808,7 +859,12 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
         parent = path_template.expand(
             "projects/{project}/locations/{location}", project=project_id, location=location
         )
-        result = client.list_instances(parent=parent, retry=retry, timeout=timeout, metadata=metadata)
+        result = client.list_instances(
+            parent=parent,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata or (),
+        )
         self.log.info("Fetched instances")
         return result
 
@@ -871,7 +927,7 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
 
         self.log.info("Updating instances: %s", instance.name)
         result = client.update_instance(
-            update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata
+            update_mask=update_mask, resource=instance, retry=retry, timeout=timeout, metadata=metadata or ()
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
@@ -934,7 +990,7 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
             parameters=parameters,
             retry=retry,
             timeout=timeout,
-            metadata=metadata,
+            metadata=metadata or (),
         )
         result.result()
         self.log.info("Update staged for instance: %s", instance_id)
diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py
index 0ac2640..64a6251 100644
--- a/airflow/providers/google/cloud/operators/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py
@@ -20,9 +20,8 @@ from typing import Dict, Optional, Sequence, Tuple, Union
 
 from google.api_core.retry import Retry
 from google.cloud.memcache_v1beta2.types import cloud_memcache
-from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
-from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
-from google.protobuf.json_format import MessageToDict
+from google.cloud.redis_v1 import FailoverInstanceRequest, InputConfig, Instance, OutputConfig
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_memorystore import (
@@ -134,7 +133,7 @@ class CloudMemorystoreCreateInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(result)
+        return Instance.to_dict(result)
 
 
 class CloudMemorystoreDeleteInstanceOperator(BaseOperator):
@@ -492,7 +491,7 @@ class CloudMemorystoreGetInstanceOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(result)
+        return Instance.to_dict(result)
 
 
 class CloudMemorystoreImportOperator(BaseOperator):
@@ -677,7 +676,7 @@ class CloudMemorystoreListInstancesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        instances = [MessageToDict(a) for a in result]
+        instances = [Instance.to_dict(a) for a in result]
         return instances
 
 
diff --git a/setup.py b/setup.py
index ff9fd71..ae18e57 100644
--- a/setup.py
+++ b/setup.py
@@ -297,7 +297,7 @@ google = [
     'google-cloud-monitoring>=0.34.0,<2.0.0',
     'google-cloud-os-login>=2.0.0,<3.0.0',
     'google-cloud-pubsub>=2.0.0,<3.0.0',
-    'google-cloud-redis>=0.3.0,<2.0.0',
+    'google-cloud-redis>=2.0.0,<3.0.0',
     'google-cloud-secret-manager>=0.2.0,<2.0.0',
     'google-cloud-spanner>=1.10.0,<2.0.0',
     'google-cloud-speech>=0.36.3,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
index 40de3b8..9e6f442 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py
@@ -85,7 +85,10 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.get_instance.assert_called_once_with(
-            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+            request=dict(name=TEST_NAME_DEFAULT_PROJECT_ID),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
         )
         assert Instance(name=TEST_NAME) == result
 
@@ -116,13 +119,15 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             ]
         )
         mock_get_conn.return_value.create_instance.assert_called_once_with(
-            instance=Instance(
-                name=TEST_NAME,
-                labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+            request=dict(
+                parent=TEST_PARENT_DEFAULT_PROJECT_ID,
+                instance=Instance(
+                    name=TEST_NAME,
+                    labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+                ),
+                instance_id=TEST_INSTANCE_ID,
             ),
-            instance_id=TEST_INSTANCE_ID,
             metadata=TEST_METADATA,
-            parent=TEST_PARENT_DEFAULT_PROJECT_ID,
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
         )
@@ -143,7 +148,10 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.delete_instance.assert_called_once_with(
-            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+            request=dict(name=TEST_NAME_DEFAULT_PROJECT_ID),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
         )
 
     @mock.patch(
@@ -161,7 +169,10 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.get_instance.assert_called_once_with(
-            name=TEST_NAME_DEFAULT_PROJECT_ID, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+            request=dict(name=TEST_NAME_DEFAULT_PROJECT_ID),
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
         )
 
     @mock.patch(
@@ -179,8 +190,7 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.list_instances.assert_called_once_with(
-            parent=TEST_PARENT_DEFAULT_PROJECT_ID,
-            page_size=TEST_PAGE_SIZE,
+            request=dict(parent=TEST_PARENT_DEFAULT_PROJECT_ID, page_size=TEST_PAGE_SIZE),
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
@@ -203,8 +213,7 @@ class TestCloudMemorystoreWithDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.update_instance.assert_called_once_with(
-            update_mask=TEST_UPDATE_MASK,
-            instance=Instance(name=TEST_NAME_DEFAULT_PROJECT_ID),
+            request=dict(update_mask=TEST_UPDATE_MASK, instance=Instance(name=TEST_NAME_DEFAULT_PROJECT_ID)),
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
@@ -234,7 +243,7 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.get_instance.assert_called_once_with(
-            name="projects/test-project-id/locations/test-location/instances/test-instance-id",
+            request=dict(name="projects/test-project-id/locations/test-location/instances/test-instance-id"),
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
@@ -275,13 +284,15 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
         )
 
         mock_get_conn.return_value.create_instance.assert_called_once_with(
-            instance=Instance(
-                name=TEST_NAME,
-                labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+            request=dict(
+                parent=TEST_PARENT,
+                instance=Instance(
+                    name=TEST_NAME,
+                    labels={"airflow-version": "v" + version.version.replace(".", "-").replace("+", "-")},
+                ),
+                instance_id=TEST_INSTANCE_ID,
             ),
-            instance_id=TEST_INSTANCE_ID,
             metadata=TEST_METADATA,
-            parent=TEST_PARENT,
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
         )
@@ -316,7 +327,7 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.delete_instance.assert_called_once_with(
-            name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+            request=dict(name=TEST_NAME), retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
         )
 
     @mock.patch(
@@ -347,7 +358,7 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.get_instance.assert_called_once_with(
-            name=TEST_NAME, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
+            request=dict(name=TEST_NAME), retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
         )
 
     @mock.patch(
@@ -378,8 +389,7 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
             metadata=TEST_METADATA,
         )
         mock_get_conn.return_value.list_instances.assert_called_once_with(
-            parent=TEST_PARENT,
-            page_size=TEST_PAGE_SIZE,
+            request=dict(parent=TEST_PARENT, page_size=TEST_PAGE_SIZE),
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
@@ -413,8 +423,7 @@ class TestCloudMemorystoreWithoutDefaultProjectIdHook(TestCase):
             project_id=TEST_PROJECT_ID,
         )
         mock_get_conn.return_value.update_instance.assert_called_once_with(
-            update_mask=TEST_UPDATE_MASK,
-            instance=Instance(name=TEST_NAME),
+            request=dict(update_mask={'paths': ['memory_size_gb']}, instance=Instance(name=TEST_NAME)),
             retry=TEST_RETRY,
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
diff --git a/tests/providers/google/cloud/operators/test_cloud_memorystore.py b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
index 8ef60bd..6db8a3a 100644
--- a/tests/providers/google/cloud/operators/test_cloud_memorystore.py
+++ b/tests/providers/google/cloud/operators/test_cloud_memorystore.py
@@ -20,7 +20,7 @@ from unittest import TestCase, mock
 
 from google.api_core.retry import Retry
 from google.cloud.memcache_v1beta2.types import cloud_memcache
-from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
+from google.cloud.redis_v1 import FailoverInstanceRequest
 from google.cloud.redis_v1.types import Instance
 
 from airflow.providers.google.cloud.operators.cloud_memorystore import (
@@ -78,6 +78,7 @@ class TestCloudMemorystoreCreateInstanceOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
+        mock_hook.return_value.create_instance.return_value = Instance(name=TEST_NAME)
         task.execute(mock.MagicMock())
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
@@ -199,6 +200,7 @@ class TestCloudMemorystoreGetInstanceOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
+        mock_hook.return_value.get_instance.return_value = Instance(name=TEST_NAME)
         task.execute(mock.MagicMock())
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,