You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/05 12:06:58 UTC

[GitHub] kaxil closed pull request #4353: [AIRFLOW-3480] Add Database Deploy/Update/Delete operators

kaxil closed pull request #4353: [AIRFLOW-3480] Add Database Deploy/Update/Delete operators
URL: https://github.com/apache/airflow/pull/4353
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_spanner.py b/airflow/contrib/example_dags/example_gcp_spanner.py
index cec3dcb855..0aeb1e63a0 100644
--- a/airflow/contrib/example_dags/example_gcp_spanner.py
+++ b/airflow/contrib/example_dags/example_gcp_spanner.py
@@ -21,15 +21,16 @@
 Example Airflow DAG that creates, updates, queries and deletes a Cloud Spanner instance.
 
 This DAG relies on the following environment variables
-* SPANNER_PROJECT_ID - Google Cloud Platform project for the Cloud Spanner instance.
-* SPANNER_INSTANCE_ID - Cloud Spanner instance ID.
-* SPANNER_CONFIG_NAME - The name of the instance's configuration. Values are of the form
-    projects/<project>/instanceConfigs/<configuration>.
+* GCP_PROJECT_ID - Google Cloud Platform project for the Cloud Spanner instance.
+* GCP_SPANNER_INSTANCE_ID - Cloud Spanner instance ID.
+* GCP_SPANNER_DATABASE_ID - Cloud Spanner database ID.
+* GCP_SPANNER_CONFIG_NAME - The name of the instance's configuration. Values are of the
+    form projects/<gcp_project>/instanceConfigs/<configuration>.
     See also:
         https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig
         https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs
-* SPANNER_NODE_COUNT - Number of nodes allocated to the instance.
-* SPANNER_DISPLAY_NAME - The descriptive name for this instance as it appears in UIs.
+* GCP_SPANNER_NODE_COUNT - Number of nodes allocated to the instance.
+* GCP_SPANNER_DISPLAY_NAME - The descriptive name for this instance as it appears in UIs.
     Must be unique per project and between 4 and 30 characters in length.
 """
 
@@ -39,16 +40,21 @@
 from airflow import models
 from airflow.contrib.operators.gcp_spanner_operator import \
     CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDatabaseQueryOperator, \
-    CloudSpannerInstanceDeleteOperator
+    CloudSpannerInstanceDeleteOperator, \
+    CloudSpannerInstanceDatabaseDeployOperator, \
+    CloudSpannerInstanceDatabaseUpdateOperator, \
+    CloudSpannerInstanceDatabaseDeleteOperator
 
 # [START howto_operator_spanner_arguments]
-PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID', 'example-project')
-INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID', 'testinstance')
-DB_ID = os.environ.get('SPANNER_DB_ID', 'db1')
-CONFIG_NAME = os.environ.get('SPANNER_CONFIG_NAME',
-                             'projects/example-project/instanceConfigs/eur3')
-NODE_COUNT = os.environ.get('SPANNER_NODE_COUNT', '1')
-DISPLAY_NAME = os.environ.get('SPANNER_DISPLAY_NAME', 'Test Instance')
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
+GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
+GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
+                                         'projects/example-project/instanceConfigs/eur3')
+GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
+GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
+# OPERATION_ID should be unique per operation
+OPERATION_ID = 'unique_operation_id'
 # [END howto_operator_spanner_arguments]
 
 default_args = {
@@ -63,51 +69,115 @@
     # Create
     # [START howto_operator_spanner_deploy]
     spanner_instance_create_task = CloudSpannerInstanceDeployOperator(
-        project_id=PROJECT_ID,
-        instance_id=INSTANCE_ID,
-        configuration_name=CONFIG_NAME,
-        node_count=int(NODE_COUNT),
-        display_name=DISPLAY_NAME,
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        configuration_name=GCP_SPANNER_CONFIG_NAME,
+        node_count=int(GCP_SPANNER_NODE_COUNT),
+        display_name=GCP_SPANNER_DISPLAY_NAME,
         task_id='spanner_instance_create_task'
     )
     # [END howto_operator_spanner_deploy]
 
     # Update
     spanner_instance_update_task = CloudSpannerInstanceDeployOperator(
-        project_id=PROJECT_ID,
-        instance_id=INSTANCE_ID,
-        configuration_name=CONFIG_NAME,
-        node_count=int(NODE_COUNT) + 1,
-        display_name=DISPLAY_NAME + '_updated',
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        configuration_name=GCP_SPANNER_CONFIG_NAME,
+        node_count=int(GCP_SPANNER_NODE_COUNT) + 1,
+        display_name=GCP_SPANNER_DISPLAY_NAME + '_updated',
         task_id='spanner_instance_update_task'
     )
 
+    # [START howto_operator_spanner_database_deploy]
+    spanner_database_deploy_task = CloudSpannerInstanceDatabaseDeployOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        database_id=GCP_SPANNER_DATABASE_ID,
+        ddl_statements=[
+            "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
+            "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
+        ],
+        task_id='spanner_database_deploy_task'
+    )
+    # [END howto_operator_spanner_database_deploy]
+
+    # [START howto_operator_spanner_database_update]
+    spanner_database_update_task = CloudSpannerInstanceDatabaseUpdateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        database_id=GCP_SPANNER_DATABASE_ID,
+        ddl_statements=[
+            "CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
+        ],
+        task_id='spanner_database_update_task'
+    )
+    # [END howto_operator_spanner_database_update]
+
+    # [START howto_operator_spanner_database_update_idempotent]
+    spanner_database_update_idempotent1_task = CloudSpannerInstanceDatabaseUpdateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        database_id=GCP_SPANNER_DATABASE_ID,
+        operation_id=OPERATION_ID,
+        ddl_statements=[
+            "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
+        ],
+        task_id='spanner_database_update_idempotent1_task'
+    )
+    spanner_database_update_idempotent2_task = CloudSpannerInstanceDatabaseUpdateOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        database_id=GCP_SPANNER_DATABASE_ID,
+        operation_id=OPERATION_ID,
+        ddl_statements=[
+            "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
+        ],
+        task_id='spanner_database_update_idempotent2_task'
+    )
+    # [END howto_operator_spanner_database_update_idempotent]
+
     # [START howto_operator_spanner_query]
-    spanner_instance_query = CloudSpannerInstanceDatabaseQueryOperator(
-        project_id=PROJECT_ID,
-        instance_id=INSTANCE_ID,
+    spanner_instance_query_task = CloudSpannerInstanceDatabaseQueryOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id='db1',
-        query="DELETE FROM my_table2 WHERE true",
+        query=["DELETE FROM my_table2 WHERE true"],
         task_id='spanner_instance_query'
     )
     # [END howto_operator_spanner_query]
 
-    spanner_instance_query2 = CloudSpannerInstanceDatabaseQueryOperator(
-        project_id=PROJECT_ID,
-        instance_id=INSTANCE_ID,
+    spanner_instance_query2_task = CloudSpannerInstanceDatabaseQueryOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
         database_id='db1',
         query="example_gcp_spanner.sql",
         task_id='spanner_instance_query2'
     )
 
+    # [START howto_operator_spanner_database_delete]
+    spanner_database_delete_task = CloudSpannerInstanceDatabaseDeleteOperator(
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
+        database_id=GCP_SPANNER_DATABASE_ID,
+        task_id='spanner_database_delete_task'
+    )
+    # [END howto_operator_spanner_database_delete]
+
     # [START howto_operator_spanner_delete]
     spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator(
-        project_id=PROJECT_ID,
-        instance_id=INSTANCE_ID,
+        project_id=GCP_PROJECT_ID,
+        instance_id=GCP_SPANNER_INSTANCE_ID,
         task_id='spanner_instance_delete_task'
     )
     # [END howto_operator_spanner_delete]
 
-    spanner_instance_create_task >> spanner_instance_update_task \
-        >> spanner_instance_query >> spanner_instance_query2 \
+    spanner_instance_create_task \
+        >> spanner_instance_update_task \
+        >> spanner_database_deploy_task \
+        >> spanner_database_update_task \
+        >> spanner_database_update_idempotent1_task \
+        >> spanner_database_update_idempotent2_task \
+        >> spanner_instance_query_task \
+        >> spanner_instance_query2_task \
+        >> spanner_database_delete_task \
         >> spanner_instance_delete_task
diff --git a/airflow/contrib/hooks/gcp_spanner_hook.py b/airflow/contrib/hooks/gcp_spanner_hook.py
index 96e8bcb71c..23f3a3c86d 100644
--- a/airflow/contrib/hooks/gcp_spanner_hook.py
+++ b/airflow/contrib/hooks/gcp_spanner_hook.py
@@ -16,13 +16,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from google.api_core.exceptions import GoogleAPICallError
+from google.api_core.exceptions import GoogleAPICallError, AlreadyExists
 from google.cloud.spanner_v1.client import Client
 from google.cloud.spanner_v1.database import Database
 from google.cloud.spanner_v1.instance import Instance  # noqa: F401
 from google.longrunning.operations_grpc_pb2 import Operation  # noqa: F401
 from typing import Optional, Callable  # noqa: F401
 
+from airflow import AirflowException
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 
@@ -41,11 +42,12 @@ def __init__(self,
     def get_client(self, project_id):
         # type: (str) -> Client
         """
-        Provides a client for interacting with Cloud Spanner API.
+        Provides a client for interacting with the Cloud Spanner API.
 
-        :param project_id: The ID of the project which owns the instances, tables and data.
+        :param project_id: The ID of the  GCP project that owns the Cloud Spanner
+            database.
         :type project_id: str
-        :return: Client for interacting with Cloud Spanner API. See:
+        :return: Client for interacting with the Cloud Spanner API. See:
             https://googleapis.github.io/google-cloud-python/latest/spanner/client-api.html#google.cloud.spanner_v1.client.Client
         :rtype: object
         """
@@ -58,16 +60,15 @@ def get_instance(self, project_id, instance_id):
         """
         Gets information about a particular instance.
 
-        :param project_id: The ID of the project which owns the instances, tables and data.
+        :param project_id: The ID of the project which owns the Cloud Spanner Database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id: The ID of the Cloud Spanner instance.
         :type instance_id: str
         :return: Representation of a Cloud Spanner Instance. See:
             https://googleapis.github.io/google-cloud-python/latest/spanner/instance-api.html#google.cloud.spanner_v1.instance.Instance
         :rtype: object
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id)
+        instance = self.get_client(project_id).instance(instance_id)
         if not instance.exists():
             return None
         return instance
@@ -78,21 +79,22 @@ def create_instance(self, project_id, instance_id, configuration_name, node_coun
         """
         Creates a new Cloud Spanner instance.
 
-        :param project_id: The ID of the project which owns the instances, tables and
-            data.
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id: The ID of the Cloud Spanner instance.
         :type instance_id: str
-        :param configuration_name: Name of the instance configuration defining how the
-            instance will be created. Required for instances which do not yet exist.
+        :param configuration_name: The name of the instance configuration defining how the
+            instance will be created. Possible configuration values can be retrieved via
+            https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list
         :type configuration_name: str
-        :param node_count: (Optional) Number of nodes allocated to the instance.
+        :param node_count: (Optional) The number of nodes allocated to the Cloud Spanner
+            instance.
         :type node_count: int
-        :param display_name: (Optional) The display name for the instance in the Cloud
-            Console UI. (Must be between 4 and 30 characters.) If this value is not set
-            in the constructor, will fall back to the instance ID.
+        :param display_name: (Optional) The display name for the instance in the GCP
+            Console. Must be between 4 and 30 characters.  If this value is not set in
+            the constructor, the name falls back to the instance ID.
         :type display_name: str
-        :return: True if the operation succeeded, raises an exception otherwise.
+        :return: True if the operation succeeds. Otherwise,raises an exception.
         :rtype: bool
         """
         return self._apply_to_instance(project_id, instance_id, configuration_name,
@@ -104,21 +106,22 @@ def update_instance(self, project_id, instance_id, configuration_name, node_coun
         """
         Updates an existing Cloud Spanner instance.
 
-        :param project_id: The ID of the project which owns the instances, tables and
-            data.
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id: The ID of the Cloud Spanner instance.
         :type instance_id: str
-        :param configuration_name: Name of the instance configuration defining how the
-            instance will be created. Required for instances which do not yet exist.
+        :param configuration_name: The name of the instance configuration defining how the
+            instance will be created. Possible configuration values can be retrieved via
+            https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list
         :type configuration_name: str
-        :param node_count: (Optional) Number of nodes allocated to the instance.
+        :param node_count: (Optional) The number of nodes allocated to the Cloud Spanner
+            instance.
         :type node_count: int
-        :param display_name: (Optional) The display name for the instance in the Cloud
-            Console UI. (Must be between 4 and 30 characters.) If this value is not set
-            in the constructor, will fall back to the instance ID.
+        :param display_name: (Optional) The display name for the instance in the GCP
+            Console. Must be between 4 and 30 characters. If this value is not set in
+            the constructor, the name falls back to the instance ID.
         :type display_name: str
-        :return: True if the operation succeeded, raises an exception otherwise.
+        :return: True if the operation succeeded. Otherwise, raises an exception.
         :rtype: bool
         """
         return self._apply_to_instance(project_id, instance_id, configuration_name,
@@ -130,8 +133,7 @@ def _apply_to_instance(self, project_id, instance_id, configuration_name, node_c
         """
         Invokes a method on a given instance by applying a specified Callable.
 
-        :param project_id: The ID of the project which owns the instances, tables and
-            data.
+        :param project_id: The ID of the project which owns the Cloud Spanner Database.
         :type project_id: str
         :param instance_id: The ID of the instance.
         :type instance_id: str
@@ -147,15 +149,13 @@ def _apply_to_instance(self, project_id, instance_id, configuration_name, node_c
         :param func: Method of the instance to be called.
         :type func: Callable
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id,
-                                   configuration_name=configuration_name,
-                                   node_count=node_count,
-                                   display_name=display_name)
+        instance = self.get_client(project_id).instance(
+            instance_id, configuration_name=configuration_name,
+            node_count=node_count, display_name=display_name)
         try:
             operation = func(instance)  # type: Operation
         except GoogleAPICallError as e:
-            self.log.error('An error occurred: %s. Aborting.', e.message)
+            self.log.error('An error occurred: %s. Exiting.', e.message)
             raise e
 
         if operation:
@@ -168,39 +168,175 @@ def delete_instance(self, project_id, instance_id):
         """
         Deletes an existing Cloud Spanner instance.
 
-        :param project_id: The ID of the project which owns the instances, tables and data.
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id:  The ID of the Cloud Spanner instance.
         :type instance_id: str
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id)
+        instance = self.get_client(project_id).instance(instance_id)
         try:
             instance.delete()
             return True
         except GoogleAPICallError as e:
-            self.log.error('An error occurred: %s. Aborting.', e.message)
+            self.log.error('An error occurred: %s. Exiting.', e.message)
+            raise e
+
+    def get_database(self, project_id, instance_id, database_id):
+        # type: (str, str, str) -> Optional[Database]
+        """
+        Retrieves a database in Cloud Spanner. If the database does not exist
+        in the specified instance, it returns None.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :return: Database object or None if database does not exist
+        :rtype: Union[Database, None]
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        if not database.exists():
+            return None
+        else:
+            return database
+
+    def create_database(self, project_id, instance_id, database_id, ddl_statements):
+        # type: (str, str, str, [str]) -> bool
+        """
+        Creates a new database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database to create in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new database.
+        :type ddl_statements: list[str]
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id,
+                                     ddl_statements=ddl_statements)
+        try:
+            operation = database.create()  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Exiting.', e.message)
             raise e
 
+        if operation:
+            result = operation.result()
+            self.log.info(result)
+        return True
+
+    def update_database(self, project_id, instance_id, database_id, ddl_statements,
+                        operation_id=None):
+        # type: (str, str, str, [str], str) -> bool
+        """
+        Updates DDL of a database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new database.
+        :type ddl_statements: list[str]
+        :param operation_id: (Optional) The unique per database operation ID that can be
+            specified to implement idempotency check.
+        :type operation_id: str
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        try:
+            operation = database.update_ddl(
+                ddl_statements, operation_id=operation_id)
+            if operation:
+                result = operation.result()
+                self.log.info(result)
+            return True
+        except AlreadyExists as e:
+            if e.code == 409 and operation_id in e.message:
+                self.log.info("Replayed update_ddl message - the operation id %s "
+                              "was already done before.", operation_id)
+                return True
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Exiting.', e.message)
+            raise e
+
+    def delete_database(self, project_id, instance_id, database_id):
+        # type: (str, str, str) -> bool
+        """
+        Drops a database in Cloud Spanner.
+
+        :param project_id:  The ID of the GCP project that owns the Cloud Spanner
+            database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).\
+            instance(instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project {} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        try:
+            operation = database.drop()  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Exiting.', e.message)
+            raise e
+
+        if operation:
+            result = operation.result()
+            self.log.info(result)
+        return True
+
     def execute_dml(self, project_id, instance_id, database_id, queries):
         # type: (str, str, str, str) -> None
         """
         Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
 
-        :param project_id: The ID of the project which owns the instances, tables and data.
+        :param project_id: The ID of the GCP project that owns the Cloud Spanner
+            database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id: The ID of the Cloud Spanner instance.
         :type instance_id: str
-        :param database_id: The ID of the database.
+        :param database_id: The ID of the database in Cloud Spanner.
         :type database_id: str
-        :param queries: The queries to be executed.
+        :param queries: The queries to execute.
         :type queries: str
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id)
-        database = Database(database_id, instance)
-        database.run_in_transaction(lambda transaction:
-                                    self._execute_sql_in_transaction(transaction, queries))
+        instance = self.get_client(project_id).instance(instance_id)
+        Database(database_id, instance).run_in_transaction(
+            lambda transaction: self._execute_sql_in_transaction(transaction, queries))
 
     @staticmethod
     def _execute_sql_in_transaction(transaction, queries):
diff --git a/airflow/contrib/operators/gcp_spanner_operator.py b/airflow/contrib/operators/gcp_spanner_operator.py
index b803fcc30a..8ea08ea0cf 100644
--- a/airflow/contrib/operators/gcp_spanner_operator.py
+++ b/airflow/contrib/operators/gcp_spanner_operator.py
@@ -26,21 +26,23 @@
 
 class CloudSpannerInstanceDeployOperator(BaseOperator):
     """
-    Creates a new Cloud Spanner instance or, if an instance with the same instance_id
-    exists in the specified project, updates it.
+    Creates a new Cloud Spanner instance, or if an instance with the same instance_id
+    exists in the specified project, updates the Cloud Spanner instance.
 
-    :param project_id: The ID of the project which owns the instances, tables and data.
+    :param project_id: The ID of the project which owns the Cloud Spanner Database.
     :type project_id: str
     :param instance_id: Cloud Spanner instance ID.
     :type instance_id: str
-    :param configuration_name: Name of the instance configuration defining
-        how the instance will be created. Required for instances which do not yet exist.
+    :param configuration_name:  The name of the Cloud Spanner instance configuration
+      defining how the instance will be created. Required for
+      instances that do not yet exist.
     :type configuration_name: str
-    :param node_count: (Optional) Number of nodes allocated to the instance.
+    :param node_count: (Optional) The number of nodes allocated to the Cloud Spanner
+      instance.
     :type node_count: int
-    :param display_name: (Optional) The display name for the instance in the
-        Cloud Console UI. (Must be between 4 and 30 characters.) If this value is not
-        set in the constructor, will fall back to the instance ID.
+    :param display_name: (Optional) The display name for the Cloud Spanner  instance in
+      the GCP Console. (Must be between 4 and 30 characters.) If this value is not set
+      in the constructor, the name is the same as the instance ID.
     :type display_name: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
@@ -91,12 +93,12 @@ def execute(self, context):
 
 class CloudSpannerInstanceDeleteOperator(BaseOperator):
     """
-    Deletes a Cloud Spanner instance.
-    If an instance does not exist, no action will be taken and the operator will succeed.
+    Deletes a Cloud Spanner instance. If an instance does not exist,
+    no action is taken and the operator succeeds.
 
-    :param project_id: The ID of the project which owns the instances, tables and data.
+    :param project_id: The ID of the project that owns the Cloud Spanner Database.
     :type project_id: str
-    :param instance_id: Cloud Spanner instance ID.
+    :param instance_id: The Cloud Spanner instance ID.
     :type instance_id: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
@@ -138,13 +140,14 @@ class CloudSpannerInstanceDatabaseQueryOperator(BaseOperator):
     """
     Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
 
-    :param project_id: The ID of the project which owns the instances, tables and data.
+    :param project_id: The ID of the project that owns the Cloud Spanner Database.
     :type project_id: str
-    :param instance_id: The ID of the instance.
+    :param instance_id: The Cloud Spanner instance ID.
     :type instance_id: str
-    :param database_id: The ID of the database.
+    :param database_id: The Cloud Spanner database ID.
     :type database_id: str
-    :param query: The query or list of queries to be executed. Can be a path to a SQL file.
+    :param query: The query or list of queries to be executed. Can be a path to a SQL
+       file.
     :type query: str or list
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
@@ -197,3 +200,195 @@ def execute(self, context):
     def sanitize_queries(queries):
         if len(queries) and queries[-1] == '':
             del queries[-1]
+
+
+class CloudSpannerInstanceDatabaseDeployOperator(BaseOperator):
+    """
+    Creates a new Cloud Spanner database, or if database exists,
+    the operator does nothing.
+
+    :param project_id: The ID of the project that owns the Cloud Spanner Database.
+    :type project_id: str
+    :param instance_id: The Cloud Spanner instance ID.
+    :type instance_id: str
+    :param database_id: The Cloud Spanner database ID.
+    :type database_id: str
+    :param ddl_statements: The string list containing DDL for the new database.
+    :type ddl_statements: list of str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_database_deploy_template_fields]
+    template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
+                       'gcp_conn_id')
+    template_ext = ('.sql', )
+    # [END gcp_spanner_database_deploy_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 database_id,
+                 ddl_statements,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.database_id = database_id
+        self.ddl_statements = ddl_statements
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDatabaseDeployOperator, self).__init__(*args, **kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is empty")
+        if not self.database_id:
+            raise AirflowException("The required parameter 'database_id' is empty")
+        if not self.ddl_statements:
+            raise AirflowException("The required parameter 'ddl_statements' is empty")
+
+    def execute(self, context):
+        if not self._hook.get_database(self.project_id,
+                                       self.instance_id,
+                                       self.database_id):
+            self.log.info("Creating Cloud Spanner database "
+                          "'%s' in project '%s' and instance '%s'",
+                          self.database_id, self.project_id, self.instance_id)
+            return self._hook.create_database(project_id=self.project_id,
+                                              instance_id=self.instance_id,
+                                              database_id=self.database_id,
+                                              ddl_statements=self.ddl_statements)
+        else:
+            self.log.info("The database '%s' in project '%s' and instance '%s'"
+                          " already exists. Nothing to do. Exiting.",
+                          self.database_id, self.project_id, self.instance_id)
+        return True
+
+
+class CloudSpannerInstanceDatabaseUpdateOperator(BaseOperator):
+    """
+    Updates a Cloud Spanner database with the specified DDL statement.
+
+    :param project_id: The ID of the project that owns the the Cloud Spanner Database.
+    :type project_id: str
+    :param instance_id: The Cloud Spanner instance ID.
+    :type instance_id: str
+    :param database_id: The Cloud Spanner database ID.
+    :type database_id: str
+    :param ddl_statements: The string list containing DDL to apply to the database.
+    :type ddl_statements: list[str]
+    :param operation_id: (Optional) Unique per database operation id that can
+           be specified to implement idempotency check.
+    :type operation_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_database_update_template_fields]
+    template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
+                       'gcp_conn_id')
+    template_ext = ('.sql', )
+    # [END gcp_spanner_database_update_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 database_id,
+                 ddl_statements,
+                 operation_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.database_id = database_id
+        self.ddl_statements = ddl_statements
+        self.operation_id = operation_id
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDatabaseUpdateOperator, self).__init__(*args, **kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is empty")
+        if not self.database_id:
+            raise AirflowException("The required parameter 'database_id' is empty")
+        if not self.ddl_statements:
+            raise AirflowException("The required parameter 'ddl_statements' is empty")
+
+    def execute(self, context):
+        if not self._hook.get_database(self.project_id,
+                                       self.instance_id,
+                                       self.database_id):
+            raise AirflowException("The Cloud Spanner database "
+                                   "'%s' in project '%s' and instance '%s' is missing."
+                                   " Create the database first before you can update it.",
+                                   self.database_id, self.project_id, self.instance_id)
+        else:
+            return self._hook.update_database(project_id=self.project_id,
+                                              instance_id=self.instance_id,
+                                              database_id=self.database_id,
+                                              ddl_statements=self.ddl_statements,
+                                              operation_id=self.operation_id)
+
+
+class CloudSpannerInstanceDatabaseDeleteOperator(BaseOperator):
+    """
+    Deletes a Cloud Spanner database.
+
+    :param project_id: The ID of the project that owns the Cloud Spanner Database.
+    :type project_id: str
+    :param instance_id: Cloud Spanner instance ID.
+    :type instance_id: str
+    :param database_id: Cloud Spanner database ID.
+    :type database_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_database_delete_template_fields]
+    template_fields = ('project_id', 'instance_id', 'database_id',
+                       'gcp_conn_id')
+    # [END gcp_spanner_database_delete_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 database_id,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.database_id = database_id
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDatabaseDeleteOperator, self).__init__(*args, **kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is empty")
+        if not self.database_id:
+            raise AirflowException("The required parameter 'database_id' is empty")
+
+    def execute(self, context):
+        db = self._hook.get_database(self.project_id,
+                                     self.instance_id,
+                                     self.database_id)
+        if not db:
+            self.log.info("The Cloud Spanner database was missing: "
+                          "'%s' in project '%s' and instance '%s'. Assuming success.",
+                          self.database_id, self.project_id, self.instance_id)
+            return True
+        else:
+            return self._hook.delete_database(project_id=self.project_id,
+                                              instance_id=self.instance_id,
+                                              database_id=self.database_id)
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 3a88d66339..bcb9f9b9b2 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -671,21 +671,117 @@ More information
 See `Google Cloud Functions API documentation
 <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/create>`_.
 
-Google Cloud Sql Operators
---------------------------
+Google Cloud Spanner Operators
+------------------------------
 
-CloudSpannerInstanceDatabaseQueryOperator
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+CloudSpannerInstanceDatabaseDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
+Deletes a database from the specified Cloud Spanner instance. If the database does not
+exist, no action is taken, and the operator succeeds.
+
+For parameter definition, take a look at
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeleteOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables.
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :start-after: [START howto_operator_spanner_arguments]
+    :end-before: [END howto_operator_spanner_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_spanner_database_delete]
+    :end-before: [END howto_operator_spanner_database_delete]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_spanner_delete_template_fields]
+    :end-before: [END gcp_spanner_delete_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud Spanner API documentation for database drop call
+<https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases/dropDatabase>`_.
+
+
+CloudSpannerInstanceDatabaseDeployOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Creates a new Cloud Spanner database in the specified instance, or if the
+desired database exists, assumes success with no changes applied to database
+configuration. No structure of the database is verified - it's enough if the database exists
+with the same name.
+
+For parameter definition, take a look at
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeployOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables.
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :start-after: [START howto_operator_spanner_arguments]
+    :end-before: [END howto_operator_spanner_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_spanner_database_deploy]
+    :end-before: [END howto_operator_spanner_database_deploy]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_spanner_database_deploy_template_fields]
+    :end-before: [END gcp_spanner_database_deploy_template_fields]
+
+More information
+""""""""""""""""
+
+See Google Cloud Spanner API documentation for `database create
+<https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases/create>`_
+
+CloudSpannerInstanceDatabaseUpdateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Runs a DDL query in a Cloud Spanner database and allows you to modify the structure of an
+existing database.
+
+You can optionally specify an operation_id parameter which simplifies determining whether
+the statements were executed in case the update_database call is replayed
+(idempotency check). The operation_id should be unique within the database, and must be
+a valid identifier: `[a-z][a-z0-9_]*`. More information can be found in
+`the documentation of updateDdl API <https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases/updateDdl>`_
 
 For parameter definition take a look at
-:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseQueryOperator`.
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseUpdateOperator`.
 
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from environment variables:
+Some arguments in the example DAG are taken from environment variables.
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
     :language: python
@@ -698,37 +794,36 @@ Using the operator
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_spanner_query]
-    :end-before: [END howto_operator_spanner_query]
+    :start-after: [START howto_operator_spanner_database_update]
+    :end-before: [END howto_operator_spanner_database_update]
 
 Templating
 """"""""""
 
 .. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
-  :language: python
-  :dedent: 4
-  :start-after: [START gcp_spanner_query_template_fields]
-  :end-before: [END gcp_spanner_query_template_fields]
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_spanner_database_update_template_fields]
+    :end-before: [END gcp_spanner_database_update_template_fields]
 
 More information
 """"""""""""""""
 
-See Google Cloud Spanner API documentation for `the DML syntax
-<https://cloud.google.com/spanner/docs/dml-syntax>`_.
+See Google Cloud Spanner API documentation for `database update_ddl
+<https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases/updateDdl>`_.
 
-CloudSpannerInstanceDeployOperator
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+CloudSpannerInstanceDatabaseQueryOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Creates a new Cloud Spanner instance or, if an instance with the same name exists,
-updates it.
+Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
 
 For parameter definition take a look at
-:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator`.
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseQueryOperator`.
 
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from environment variables:
+Some arguments in the example DAG are taken from environment variables.
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
     :language: python
@@ -741,31 +836,29 @@ Using the operator
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_spanner_deploy]
-    :end-before: [END howto_operator_spanner_deploy]
+    :start-after: [START howto_operator_spanner_query]
+    :end-before: [END howto_operator_spanner_query]
 
 Templating
 """"""""""
 
 .. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
-  :language: python
-  :dedent: 4
-  :start-after: [START gcp_spanner_deploy_template_fields]
-  :end-before: [END gcp_spanner_deploy_template_fields]
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_spanner_query_template_fields]
+    :end-before: [END gcp_spanner_query_template_fields]
 
 More information
 """"""""""""""""
 
-See Google Cloud Spanner API documentation for instance `create
-<https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.CreateInstance>`_
-and `update
-<https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.UpdateInstance>`_.
+See Google Cloud Spanner API documentation for `the DML syntax
+<https://cloud.google.com/spanner/docs/dml-syntax>`_.
 
 CloudSpannerInstanceDeleteOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Deletes a Cloud Spanner instance.
-If an instance does not exist, no action will be taken and the operator will succeed.
+Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken,
+and the operator succeeds.
 
 For parameter definition take a look at
 :class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator`.
@@ -804,6 +897,9 @@ More information
 See `Google Cloud Spanner API documentation for instance delete
 <https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances/delete>`_.
 
+Google Cloud Sql Operators
+--------------------------
+
 CloudSqlInstanceDatabaseCreateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/docs/integration.rst b/docs/integration.rst
index dd104cf790..db19525954 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -688,11 +688,38 @@ Cloud Spanner
 Cloud Spanner Operators
 """""""""""""""""""""""
 
+- :ref:`CloudSpannerInstanceDatabaseDeleteOperator` : deletes an existing database from
+  a Google Cloud Spanner instance or returns success if the database is missing.
+- :ref:`CloudSpannerInstanceDatabaseDeployOperator` : creates a new database in a Google
+  Cloud instance or returns success if the database already exists.
+- :ref:`CloudSpannerInstanceDatabaseUpdateOperator` : updates the structure of a
+  Google Cloud Spanner database.
 - :ref:`CloudSpannerInstanceDatabaseQueryOperator` : executes an arbitrary DML query
   (INSERT, UPDATE, DELETE).
-- :ref:`CloudSpannerInstanceDeployOperator` : creates a new Cloud Spanner instance or,
-  if an instance with the same name exists, updates it.
-- :ref:`CloudSpannerInstanceDeleteOperator` : deletes a Cloud Spanner instance.
+- :ref:`CloudSpannerInstanceDeployOperator` : creates a new Google Cloud Spanner instance,
+  or if an instance with the same name exists, updates the instance.
+- :ref:`CloudSpannerInstanceDeleteOperator` : deletes a Google Cloud Spanner instance.
+
+.. _CloudSpannerInstanceDatabaseDeleteOperator:
+
+CloudSpannerInstanceDatabaseDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeleteOperator
+
+.. _CloudSpannerInstanceDatabaseDeployOperator:
+
+CloudSpannerInstanceDatabaseDeployOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeployOperator
+
+.. _CloudSpannerInstanceDatabaseUpdateOperator:
+
+CloudSpannerInstanceDatabaseUpdateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseUpdateOperator
 
 .. _CloudSpannerInstanceDatabaseQueryOperator:
 
@@ -715,6 +742,14 @@ CloudSpannerInstanceDeleteOperator
 
 .. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator
 
+
+CloudSpannerHook
+""""""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook
+    :members:
+
+
 Cloud SQL
 '''''''''
 
diff --git a/setup.py b/setup.py
index 410502c302..79936ffba3 100644
--- a/setup.py
+++ b/setup.py
@@ -191,7 +191,7 @@ def write_version(filename=os.path.join(*['airflow',
     'google-auth-httplib2>=0.0.1',
     'google-cloud-container>=0.1.1',
     'google-cloud-bigtable==0.31.0',
-    'google-cloud-spanner>=1.6.0',
+    'google-cloud-spanner>=1.7.1',
     'grpcio-gcp>=0.2.2',
     'PyOpenSSL',
     'pandas-gbq'
diff --git a/tests/contrib/operators/test_gcp_spanner_operator.py b/tests/contrib/operators/test_gcp_spanner_operator.py
index 38ae985f26..525f796471 100644
--- a/tests/contrib/operators/test_gcp_spanner_operator.py
+++ b/tests/contrib/operators/test_gcp_spanner_operator.py
@@ -23,7 +23,8 @@
 from airflow import AirflowException
 from airflow.contrib.operators.gcp_spanner_operator import \
     CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator, \
-    CloudSpannerInstanceDatabaseQueryOperator
+    CloudSpannerInstanceDatabaseQueryOperator, CloudSpannerInstanceDatabaseDeployOperator, \
+    CloudSpannerInstanceDatabaseDeleteOperator, CloudSpannerInstanceDatabaseUpdateOperator
 from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
     SKIP_TEST_WARNING, GCP_SPANNER_KEY
 
@@ -46,7 +47,7 @@
 INSERT_QUERY_2 = "INSERT my_table2 (id, name) VALUES (1, 'One')"
 CREATE_QUERY = "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)"
 CREATE_QUERY_2 = "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)"
-QUERY_TYPE = "DML"
+DDL_STATEMENTS = [CREATE_QUERY, CREATE_QUERY_2]
 
 
 class CloudSpannerTest(unittest.TestCase):
@@ -240,6 +241,171 @@ def test_instance_query_dml_list(self, mock_hook):
             PROJECT_ID, INSTANCE_ID, DB_ID, [INSERT_QUERY, INSERT_QUERY_2]
         )
 
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_create(self, mock_hook):
+        mock_hook.return_value.get_database.return_value = None
+        op = CloudSpannerInstanceDatabaseDeployOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            database_id=DB_ID,
+            ddl_statements=DDL_STATEMENTS,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_database.assert_called_once_with(
+            project_id=PROJECT_ID, instance_id=INSTANCE_ID, database_id=DB_ID,
+            ddl_statements=DDL_STATEMENTS
+        )
+        mock_hook.return_value.update_database.assert_not_called()
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_create_with_pre_existing_db(self, mock_hook):
+        mock_hook.return_value.get_database.return_value = {"name": DB_ID}
+        op = CloudSpannerInstanceDatabaseDeployOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            database_id=DB_ID,
+            ddl_statements=DDL_STATEMENTS,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_database.assert_not_called()
+        mock_hook.return_value.update_database.assert_not_called()
+        self.assertTrue(result)
+
+    @parameterized.expand([
+        ("", INSTANCE_ID, DB_ID, DDL_STATEMENTS, 'project_id'),
+        (PROJECT_ID, "", DB_ID, DDL_STATEMENTS, 'instance_id'),
+        (PROJECT_ID, INSTANCE_ID, "", DDL_STATEMENTS, 'database_id'),
+    ])
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_create_ex_if_param_missing(self,
+                                                 project_id, instance_id,
+                                                 database_id, ddl_statements,
+                                                 exp_msg, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            CloudSpannerInstanceDatabaseDeployOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                database_id=database_id,
+                ddl_statements=ddl_statements,
+                task_id="id"
+            )
+        err = cm.exception
+        self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err))
+        mock_hook.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_update(self, mock_hook):
+        mock_hook.return_value.get_database.return_value = {"name": DB_ID}
+        op = CloudSpannerInstanceDatabaseUpdateOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            database_id=DB_ID,
+            ddl_statements=DDL_STATEMENTS,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.update_database.assert_called_once_with(
+            project_id=PROJECT_ID, instance_id=INSTANCE_ID, database_id=DB_ID,
+            ddl_statements=DDL_STATEMENTS, operation_id=None
+        )
+        self.assertTrue(result)
+
+    @parameterized.expand([
+        ("", INSTANCE_ID, DB_ID, DDL_STATEMENTS, 'project_id'),
+        (PROJECT_ID, "", DB_ID, DDL_STATEMENTS, 'instance_id'),
+        (PROJECT_ID, INSTANCE_ID, "", DDL_STATEMENTS, 'database_id'),
+    ])
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_update_ex_if_param_missing(self, project_id, instance_id,
+                                                 database_id, ddl_statements,
+                                                 exp_msg, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            CloudSpannerInstanceDatabaseUpdateOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                database_id=database_id,
+                ddl_statements=ddl_statements,
+                task_id="id"
+            )
+        err = cm.exception
+        self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err))
+        mock_hook.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_update_ex_if_database_not_exist(self, mock_hook):
+        mock_hook.return_value.get_database.return_value = None
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSpannerInstanceDatabaseUpdateOperator(
+                project_id=PROJECT_ID,
+                instance_id=INSTANCE_ID,
+                database_id=DB_ID,
+                ddl_statements=DDL_STATEMENTS,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The Cloud Spanner database 'db1' in project 'project-id' and "
+                      "instance 'instance-id' is missing", str(err))
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_delete(self, mock_hook):
+        mock_hook.return_value.get_database.return_value = {"name": DB_ID}
+        op = CloudSpannerInstanceDatabaseDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            database_id=DB_ID,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_database.assert_called_once_with(
+            project_id=PROJECT_ID, instance_id=INSTANCE_ID, database_id=DB_ID
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_delete_exits_and_succeeds_if_database_does_not_exist(self,
+                                                                           mock_hook):
+        mock_hook.return_value.get_database.return_value = None
+        op = CloudSpannerInstanceDatabaseDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            database_id=DB_ID,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_database.assert_not_called()
+        self.assertTrue(result)
+
+    @parameterized.expand([
+        ("", INSTANCE_ID, DB_ID, DDL_STATEMENTS, 'project_id'),
+        (PROJECT_ID, "", DB_ID, DDL_STATEMENTS, 'instance_id'),
+        (PROJECT_ID, INSTANCE_ID, "", DDL_STATEMENTS, 'database_id'),
+    ])
+    @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_database_delete_ex_if_param_missing(self, project_id, instance_id,
+                                                 database_id, ddl_statements,
+                                                 exp_msg, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            CloudSpannerInstanceDatabaseDeleteOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                database_id=database_id,
+                ddl_statements=ddl_statements,
+                task_id="id"
+            )
+        err = cm.exception
+        self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err))
+        mock_hook.assert_not_called()
+
 
 @unittest.skipIf(
     BaseGcpIntegrationTestCase.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services