You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/03 16:49:56 UTC

[GitHub] potiuk commented on a change in pull request #4353: [AIRFLOW-3480] Add Database Deploy/Update/Delete operators

potiuk commented on a change in pull request #4353: [AIRFLOW-3480] Add Database Deploy/Update/Delete operators
URL: https://github.com/apache/incubator-airflow/pull/4353#discussion_r245062137
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_spanner_hook.py
 ##########
 @@ -168,39 +168,175 @@ def delete_instance(self, project_id, instance_id):
         """
         Deletes an existing Cloud Spanner instance.
 
-        :param project_id: The ID of the project which owns the instances, tables and data.
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id:  The ID of the Cloud Spanner instance.
         :type instance_id: str
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id)
+        instance = self.get_client(project_id).instance(instance_id)
         try:
             instance.delete()
             return True
         except GoogleAPICallError as e:
-            self.log.error('An error occurred: %s. Aborting.', e.message)
+            self.log.error('An error occurred: %s. Exiting.', e.message)
+            raise e
+
+    def get_database(self, project_id, instance_id, database_id):
+        # type: (str, str, str) -> Optional[Database]
+        """
+        Retrieves a database in Cloud Spanner. If the database does not exist
+        in the specified instance, it returns None.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :return: Database object or None if database does not exist
+        :rtype: Union[Database, None]
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        if not database.exists():
+            return None
+        else:
+            return database
+
+    def create_database(self, project_id, instance_id, database_id, ddl_statements):
+        # type: (str, str, str, [str]) -> bool
+        """
+        Creates a new database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database to create in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new database.
+        :type ddl_statements: [str]
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id,
+                                     ddl_statements=ddl_statements)
+        try:
+            operation = database.create()  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Exiting.', e.message)
             raise e
 
+        if operation:
+            result = operation.result()
+            self.log.info(result)
+        return True
+
+    def update_database(self, project_id, instance_id, database_id, ddl_statements,
+                        operation_id=None):
+        # type: (str, str, str, [str], str) -> bool
+        """
+        Updates DDL of a database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new database.
+        :type ddl_statements: [str]
 
 Review comment:
   I actually tried to find a standard for that and could not. In fact this is not parsed 
   According to this link in pycharm there is no real standard and I see different variants used across Airflow's code. 
   https://www.jetbrains.com/help/pycharm/type-syntax-for-docstrings.html
   
   Note that in python 2 type hint comments (not docstring). The [str] works perfectly fine (and is recognized by Pycharm as sequence of strings). So there is a bit of a mess in general:
   
   I changed it now to list[str] in spanner operators (following Jetbrains' suggestions) and with the next "consistency" change i will make sure to have some consistency changes across other GCP operators 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services