You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/12 13:31:55 UTC

[GitHub] [airflow] TobKed commented on a change in pull request #10121: Google Cloud Memorystore Memcached Operators

TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r503211300



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -503,3 +506,438 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(
+        self,
+    ):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore Memcached service.
+        """
+        if not self._client:
+            self._client = CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the instance. If set
+            to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),

Review comment:
       When I tried to run example DAG i got error:
   ```
     File "/usr/local/lib/python3.7/site-packages/google/cloud/memcache_v1beta2/services/cloud_memcache/client.py", line 385, in get_instance
       metadata = tuple(metadata) + (
   TypeError: 'NoneType' object is not iterable
   ```
   because default metadata in operator is `None`.
   IMHO for optional parameters with iterables it is good to have default value set to None and handle it later. It is more intuitive and allows to pass None or () by the user and avoid such situations. 
   This approach was useg e.g here: https://github.com/apache/airflow/blob/2bf7b7cac7858f5a6a495f1a9eb4780ec84f95b4/airflow/providers/amazon/aws/operators/emr_add_steps.py#L69
   
   ```suggestion
           metadata: Optional[Sequence[Tuple[str, str]]] = None,
           
           ...
           
           metadata = metadata or ()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org