You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/11 23:36:20 UTC

[GitHub] mik-laj commented on a change in pull request #4286: [AIRFLOW-3310] Google Cloud Spanner deploy / delete operators

mik-laj commented on a change in pull request #4286: [AIRFLOW-3310] Google Cloud Spanner deploy / delete operators
URL: https://github.com/apache/incubator-airflow/pull/4286#discussion_r240833136
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_spanner_hook.py
 ##########
 @@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from google.longrunning.operations_grpc_pb2 import Operation  # noqa: F401
+from typing import Optional, Callable  # noqa: F401
+
+from google.api_core.exceptions import GoogleAPICallError
+from google.cloud.spanner_v1.client import Client
+from google.cloud.spanner_v1.instance import Instance  # noqa: F401
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+# noinspection PyAbstractClass
+class CloudSpannerHook(GoogleCloudBaseHook):
+    """
+    Hook for Google Cloud Spanner APIs.
+    """
+    _client = None
+
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(CloudSpannerHook, self).__init__(gcp_conn_id, delegate_to)
+
+    def get_client(self, project_id):
+        # type: (str) -> Client
+        """
+        Provides a client for interacting with Cloud Spanner API.
+
+        :param project_id: The ID of the project which owns the instances, tables and data.
+        :type project_id: str
+        :return: Client for interacting with Cloud Spanner API. See:
+            https://googleapis.github.io/google-cloud-python/latest/spanner/client-api.html#google.cloud.spanner_v1.client.Client
+        :rtype: object
+        """
+        if not self._client:
+            self._client = Client(project=project_id, credentials=self._get_credentials())
+        return self._client
+
+    def get_instance(self, project_id, instance_id):
+        # type: (str, str) -> Optional[Instance]
+        """
+        Gets information about a particular instance.
+
+        :param project_id: The ID of the project which owns the instances, tables and data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :return: Representation of a Cloud Spanner Instance. See:
+            https://googleapis.github.io/google-cloud-python/latest/spanner/instance-api.html#google.cloud.spanner_v1.instance.Instance
+        :rtype: object
+        """
+        client = self.get_client(project_id)
+        instance = client.instance(instance_id)
+        if not instance.exists():
+            return None
+        return instance
+
+    def create_instance(self, project_id, instance_id, configuration_name, node_count,
+                        display_name):
+        # type: (str, str, str, int, str) -> bool
+        """
+        Creates a new Cloud Spanner instance.
+
+        :param project_id: The ID of the project which owns the instances, tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining how the
+            instance will be created. Required for instances which do not yet exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :return: True if the operation succeeded, raises an exception otherwise.
+        :rtype: bool
+        """
+        return self._apply_to_instance(project_id, instance_id, configuration_name,
+                                       node_count, display_name, lambda x: x.create())
+
+    def update_instance(self, project_id, instance_id, configuration_name, node_count,
+                        display_name):
+        # type: (str, str, str, int, str) -> bool
+        """
+        Updates an existing Cloud Spanner instance.
+
+        :param project_id: The ID of the project which owns the instances, tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining how the
+            instance will be created. Required for instances which do not yet exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :return: True if the operation succeeded, raises an exception otherwise.
+        :rtype: bool
+        """
+        return self._apply_to_instance(project_id, instance_id, configuration_name,
+                                       node_count, display_name, lambda x: x.update())
+
+    def _apply_to_instance(self, project_id, instance_id, configuration_name, node_count,
+                           display_name, func):
+        # type: (str, str, str, int, str, Callable[[Instance], Operation]) -> bool
+        """
+        Invokes a method on a given instance by applying a specified Callable.
+
+        :param project_id: The ID of the project which owns the instances, tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining how the
+            instance will be created. Required for instances which do not yet exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :param func: Method of the instance to be called.
+        :type func: Callable
+        """
+        client = self.get_client(project_id)
+        instance = client.instance(instance_id,
+                                   configuration_name=configuration_name,
+                                   node_count=node_count,
+                                   display_name=display_name)
+        try:
+            operation = func(instance)  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: {}. Aborting.'.format(e.message))
 
 Review comment:
   Such formatting of messages causes that a string object is created, which then may not be used anywhere, when the login level will be too low. This may also make it difficult to analyze logs using automatic tools such as Sentry. The software will not be able to distinguish one message with different parameters.
   
   Formatting parameters should be passed as arguments to the `info` method.
   Example:
   
   ````python
   self.log.info("The Table '%s' does not exists already.", self.table_id)
   ````

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services