You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/03 00:57:04 UTC

[GitHub] [airflow] tanjinP opened a new pull request #10121: Google Cloud Memorystore Memcached Operators

tanjinP opened a new pull request #10121:
URL: https://github.com/apache/airflow/pull/10121


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   closes: #8286 
   
   ### Summary
   Will consider this a draft just to get a check if this is the right direction, also an opportunity to ask some clarifying questions:
   
   ### Questions + points that require clarity
   1. created a new hook class with same name but added `Memcached` in it, is that okay? Also reusing the existing hook and operator modules (`cloud_memorystore.py`)
   1. created a new how to guide because it didn't feel as natural to put in the Memcached specific operators in the existing one, does that make sense?
   1. does it make sense to have a single `UpdateAndApply` operator that combines 2 REST API methods [specified here](https://cloud.google.com/memorystore/docs/memcached/reference/rest/v1beta2/projects.locations.instances#methods) - is that better as opposed to having them be independent operators? I would still have them be standalone methods in the Hook, the operator is what would call them one after another.
   
   ## What is next?
   After the initial review and clarification on approach, will: 
   - continue to make the other operators reflecting [REST API methods](https://cloud.google.com/memorystore/docs/memcached/reference/rest/v1beta2/projects.locations.instances#methods), including Memcache specific ones like IAM and update and apply parameters.
     - this includes more tests
   - add examples to the DAG and corresponding entries in the How to guide
   - run all operators locally and test with personal GCP account that they all work as intended
   
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r464284666



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -501,3 +504,275 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+        Hook for Google Cloud Memorystore for Memcached service APIs.
+
+        All the methods in the hook where project_id is used must be called with
+        keyword arguments rather than positional.
+
+        :param gcp_conn_id: The connection ID to use when fetching connection info.
+        :type gcp_conn_id: str
+        :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+            if any. For this to work, the service account making the request must have
+            domain-wide delegation enabled.
+        :type delegate_to: str
+        :param impersonation_chain: Optional service account to impersonate using short-term
+            credentials, or chained list of accounts required to get the access_token
+            of the last account in the list, which will be impersonated in the request.
+            If set as a string, the account must grant the originating account
+            the Service Account Token Creator IAM role.
+            If set as a sequence, the identities from the list must grant
+            Service Account Token Creator IAM role to the directly preceding identity, with first
+            account from the list granting this role to the originating account.
+        :type impersonation_chain: Union[str, Sequence[str]]
+        """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client = None  # type: Optional[CloudMemcacheClient]
+
+    def get_conn(self, ):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+

Review comment:
       ```suggestion
   ```

##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -501,3 +504,275 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+        Hook for Google Cloud Memorystore for Memcached service APIs.
+
+        All the methods in the hook where project_id is used must be called with
+        keyword arguments rather than positional.
+
+        :param gcp_conn_id: The connection ID to use when fetching connection info.
+        :type gcp_conn_id: str
+        :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+            if any. For this to work, the service account making the request must have
+            domain-wide delegation enabled.
+        :type delegate_to: str
+        :param impersonation_chain: Optional service account to impersonate using short-term
+            credentials, or chained list of accounts required to get the access_token
+            of the last account in the list, which will be impersonated in the request.
+            If set as a string, the account must grant the originating account
+            the Service Account Token Creator IAM role.
+            If set as a sequence, the identities from the list must grant
+            Service Account Token Creator IAM role to the directly preceding identity, with first
+            account from the list granting this role to the originating account.
+        :type impersonation_chain: Union[str, Sequence[str]]
+        """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client = None  # type: Optional[CloudMemcacheClient]

Review comment:
       ```suggestion
           self._client: Optional[CloudMemcacheClient] = None
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-669547983


   Hello @tanjinP.
   @mschickien is on vacation until 10.08.  Would you like us to process this change or would you rather wait until we get more feedback from a potential user?
   Best regards,
   Kamil Breguła


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanjinP commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
tanjinP commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-669552389


   > Hello @tanjinP.
   > @mschickien is on vacation until 10.08.  Would you like us to process this change or would you rather wait until we get more feedback from a potential user?
   > Best regards,
   > Kamil Breguła
   
   Hey @mik-laj - I don't mind waiting for feedback, happy to get any suggestions to build out the feature.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-711955243


   PTAL @mik-laj @turbaszek @potiuk 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanjinP commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
tanjinP commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-687983828


   > Thank you so much @tanjinP for your input and hard work! 🐈
   
   @mschickensoup ready for another look! Implemented the other methods.
   
   I am only missing [the IAM ones as pointed out here](https://cloud.google.com/memorystore/docs/memcached/reference/rest/v1beta2/projects.locations.instances) as I felt they are okay to omit as [the client library](https://pypi.org/project/google-cloud-memcache/) does not provide them and I didn't want to string together a whole lot of specific functions into these operators and hooks. Let me know what you think!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanjinP commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
tanjinP commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-683768310


   > Hello my @tanjinP
   > 
   > I would like to thank you for this integration. This will make it much easier for everyone to use this service.
   > 
   > I've talked to some people at my team and there are a few comments:
   > 
   > * Have you thought to create the CloudMemorystoreMemcachedGetInstanceOperator operator? This would make it easier to link several operators together.
   > * In the file tests/providers/google/cloud/hooks/test_cloud_memorystore.py I can see the code commented out. Is it expected?
   > * Can you add this integration to refernece? https://airflow.readthedocs.io/en/latest/operators-and-hooks-ref.html#google
   > 
   > Kind regards 🐈
   
   Hey @mschickensoup - I've addressed some of the feedback that you pointed out. Thanks.
   
   As for the `TODO` on the tests - I  was waiting for the initial feedback before I implement some more operators and hooks. Those are yet to be done and I'll get on it later tonight ([specifically for the instance resource](https://cloud.google.com/memorystore/docs/memcached/reference/rest/v1beta2/projects.locations.instances)). Will @ you once that is complete. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-707250832


   @tanjinP @mik-laj I created proposal of some fixes to this PR here: https://github.com/tanjinP/airflow/pull/1
   I made separate example DAG for easier running system tests thanks to which I discovered more problems with protobuf. PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanjinP commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
tanjinP commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-707096564


   > @tanjinP Do you need help? TobKed is now your reviewer and you can ask him anything when you have a problem.
   
   Thanks @mik-laj - Tobiasz and I are messaging each other directly on the Slack channel for a last bit of feedback because I felt the PR discussion thread would not capture my concerns/issues appropriately. Appreciate you following up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-713455233


   I'll take a look at it in a moment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-667919869


   @mschickensoup Can I ask for a review? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r509831495



##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +251,80 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+with models.DAG(
+    "gcp_cloud_memorystore_memcached",
+    schedule_interval=None,  # Override to match your needs
+    start_date=dates.days_ago(1),
+    tags=['example'],
+) as dag:

Review comment:
       ```suggestion
   ) as dag_memcache:
   ```
   ```suggestion
   ) as dag:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-714733840


   @mschickensoup I am waiting for feedback from the production environment ;-) 
   
   @tanjinP Thank you for this change. What's next?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r494949121



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -499,3 +502,434 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(self,):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory size.
+
+        By default, the instance is accessible from the project's `default network
+        <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = ParseDict(instance, cloud_memcache.Instance())
+        elif not isinstance(instance, cloud_memcache.Instance):
+            raise AirflowException("instance is not instance of Instance type or python dict")
+
+        self._append_label(instance, "airflow-version", "v" + version.version)
+
+        result = client.create_instance(
+            parent=parent,
+            instance_id=instance_id,
+            resource=instance,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        result.result()
+        self.log.info("Instance created.")
+        return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Deletes a specific Memcached instance.  Instance stops serving and data is deleted.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        self.log.info("Fetching Instance: %s", name)
+        instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+
+        if not instance:
+            return
+
+        self.log.info("Deleting Instance: %s", name)
+        result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        result.result()
+        self.log.info("Instance deleted: %s", name)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Gets the details of a specific Memcached instance.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)

Review comment:
       ```suggestion
           result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata or ())
   ```
   metadata is required parameter by `get_instance` method and cannot be of NoneType (must be iterable)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r494925177



##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask={"displayName": "New Name"},
+        instance={"memory_size_gb": 2},
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask={"displayName": "New Name"},
+        instance={"memory_size_gb": 2},
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask="protocol,hash_algorithm",
+        parameters={"protocol": "ascii", "hash_algorithm": "jenkins"},
+    )
+
+    apply_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+        task_id="apply-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -499,3 +502,434 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(self,):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory size.
+
+        By default, the instance is accessible from the project's `default network
+        <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = ParseDict(instance, cloud_memcache.Instance())
+        elif not isinstance(instance, cloud_memcache.Instance):
+            raise AirflowException("instance is not instance of Instance type or python dict")
+
+        self._append_label(instance, "airflow-version", "v" + version.version)
+
+        result = client.create_instance(
+            parent=parent,
+            instance_id=instance_id,
+            resource=instance,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        result.result()
+        self.log.info("Instance created.")
+        return client.get_instance(name=instance_name, retry=retry, timeout=timeout, metadata=metadata)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Deletes a specific Memcached instance.  Instance stops serving and data is deleted.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        self.log.info("Fetching Instance: %s", name)
+        instance = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+
+        if not instance:
+            return
+
+        self.log.info("Deleting Instance: %s", name)
+        result = client.delete_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        result.result()
+        self.log.info("Instance deleted: %s", name)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_instance(
+        self,
+        location: str,
+        instance: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Gets the details of a specific Memcached instance.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance: The logical name of the Memcached instance in the customer project.
+        :type instance: str
+        :param project_id:  Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        name = CloudMemcacheClient.instance_path(project_id, location, instance)
+        result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata)

Review comment:
       ```suggestion
           result = client.get_instance(name=name, retry=retry, timeout=timeout, metadata=metadata or ())
   ```
   metadata is required parameter by `get_instance` method and cannot be of NoneType (must be iterable)

##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -499,3 +502,434 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(self,):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory size.
+
+        By default, the instance is accessible from the project's `default network
+        <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = ParseDict(instance, cloud_memcache.Instance())

Review comment:
       When I was trying to run it i got the following error:
   (line number is different because I rebased on the latest master locally)
   
   
   ```
   [2020-09-25 12:32:12,847] {cloud_memorystore.py:680} INFO - Instance not exists.
   [2020-09-25 12:32:12,849] {taskinstance.py:1337} ERROR - 'DESCRIPTOR'
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/proto/message.py", line 520, in __getattr__
       pb_type = self._meta.fields[key].pb_type
   KeyError: 'DESCRIPTOR'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 1076, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/opt/airflow/airflow/models/taskinstance.py", line 1198, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/opt/airflow/airflow/models/taskinstance.py", line 1243, in _execute_task
       result = task_copy.execute(context=context)
     File "/opt/airflow/airflow/providers/google/cloud/operators/cloud_memorystore.py", line 1291, in execute
       metadata=self.metadata,
     File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 373, in inner_wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/providers/google/cloud/hooks/cloud_memorystore.py", line 683, in create_instance
       instance = ParseDict(instance, cloud_memcache.Instance())
     File "/usr/local/lib/python3.7/site-packages/google/protobuf/json_format.py", line 452, in ParseDict
       parser.ConvertMessage(js_dict, message)
     File "/usr/local/lib/python3.7/site-packages/google/protobuf/json_format.py", line 476, in ConvertMessage
       message_descriptor = message.DESCRIPTOR
     File "/usr/local/lib/python3.7/site-packages/proto/message.py", line 525, in __getattr__
       raise AttributeError(str(ex))
   AttributeError: 'DESCRIPTOR'
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r503211300



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -503,3 +506,438 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(
+        self,
+    ):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),

Review comment:
       When I tried to run example DAG i got error:
   ```
     File "/usr/local/lib/python3.7/site-packages/google/cloud/memcache_v1beta2/services/cloud_memcache/client.py", line 385, in get_instance
       metadata = tuple(metadata) + (
   TypeError: 'NoneType' object is not iterable
   ```
   because default metadata in operator is `None`.
   IMHO for optional parameters with iterables it is good to have default value set to None and handle it later. It is more intuitive and allows to pass None or () by the user and avoid such situations. 
   This approach was useg e.g here: https://github.com/apache/airflow/blob/2bf7b7cac7858f5a6a495f1a9eb4780ec84f95b4/airflow/providers/amazon/aws/operators/emr_add_steps.py#L69
   
   ```suggestion
           metadata: Optional[Sequence[Tuple[str, str]]] = None,
           
           ...
           
           metadata = metadata or ()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r494925177



##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask={"displayName": "New Name"},
+        instance={"memory_size_gb": 2},
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?

##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +241,67 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+    # [START howto_operator_create_instance_memcached]
+    create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
+        task_id="create-instance-3",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME_4,
+        instance=MEMCACHED_INSTANCE,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_create_instance_memcached]
+
+    # [START howto_operator_delete_instance_memcached]
+    delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
+        task_id="delete-instance-3",
+        location="europe-north1",
+        instance=INSTANCE_NAME_4,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_delete_instance_memcached]
+
+    # [START howto_operator_get_instance_memcached]
+    get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
+        task_id="get-instance-2", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID,
+    )
+    # [END howto_operator_get_instance_memcached]
+
+    # [START howto_operator_list_instances_memcached]
+    list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
+        task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
+    )
+    # [END howto_operator_list_instances_memcached]
+
+    # [START howto_operator_update_instance_memcached]
+    update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
+        task_id="update-instance-2",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask={"displayName": "New Name"},
+        instance={"memory_size_gb": 2},
+    )
+    # [END howto_operator_update_instance_memcached]
+
+    # [START howto_operator_update_and_apply_parameters_memcached]
+    update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
+        task_id="update-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,
+        project_id=GCP_PROJECT_ID,
+        update_mask="protocol,hash_algorithm",
+        parameters={"protocol": "ascii", "hash_algorithm": "jenkins"},
+    )
+
+    apply_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
+        task_id="apply-parameters",
+        location="europe-north1",
+        instance_id=INSTANCE_NAME,

Review comment:
       Should not be `INSTANCE_NAME_4` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r509831495



##########
File path: airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py
##########
@@ -229,3 +251,80 @@
     failover_instance >> delete_instance_2
 
     export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
+
+with models.DAG(
+    "gcp_cloud_memorystore_memcached",
+    schedule_interval=None,  # Override to match your needs
+    start_date=dates.days_ago(1),
+    tags=['example'],
+) as dag:

Review comment:
       ```suggestion
   ) as dag_memcache:
   ```
   This change is required for system tests for Redis to work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mschickensoup commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mschickensoup commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-683782330


   Thank you so much @tanjinP for your input and hard work! 🐈


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r503299338



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -499,3 +502,434 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(self,):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory size.
+
+        By default, the instance is accessible from the project's `default network
+        <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf message
+            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the GCP connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = ParseDict(instance, cloud_memcache.Instance())

Review comment:
       I created issue for it: https://github.com/googleapis/python-memcache/issues/19
   `ParseDict` and `MessageToDict` have problems with parsing `google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #10121:
URL: https://github.com/apache/airflow/pull/10121


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mschickensoup commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mschickensoup commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-680011466


   Hello my @tanjinP 
   
   I would like to thank you for this integration. This will make it much easier for everyone to use this service.
   
   I've talked to some people at my team and there are a few comments:
   
   - Have you thought to create the CloudMemorystoreMemcachedGetInstanceOperator operator? This would make it easier to link several operators together.
   - In the file tests/providers/google/cloud/hooks/test_cloud_memorystore.py I can see the code commented out. Is it expected?
   - Can you add this integration to refernece? https://airflow.readthedocs.io/en/latest/operators-and-hooks-ref.html#google
   
   Kind regards 🐈


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #10121: Google Cloud Memorystore Memcached Operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#issuecomment-714170148


   Only one small problem. Besides, everything is beautiful and works.
   <img width="1439" alt="Screenshot 2020-10-22 at 03 47 23" src="https://user-images.githubusercontent.com/12058428/96808717-a599f380-1419-11eb-86af-b39628e670c6.png">
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org