You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/08/03 10:39:37 UTC

[airflow] branch master updated: Add additional Cloud Datastore operators (#10032)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6efa1b9  Add additional Cloud Datastore operators (#10032)
6efa1b9 is described below

commit 6efa1b9cb763ae0bdbc884a54d24dbdc39d9e3a6
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Mon Aug 3 12:39:05 2020 +0200

    Add additional Cloud Datastore operators (#10032)
    
    This PR adds more operators for Google Cloud Datastore
    service. It also adds missing tests and how-to guides.
---
 .../google/cloud/example_dags/example_datastore.py | 105 ++++++-
 airflow/providers/google/cloud/hooks/datastore.py  |   6 +-
 .../providers/google/cloud/operators/datastore.py  | 338 ++++++++++++++++++++-
 docs/build                                         |   1 -
 docs/howto/operator/google/cloud/datastore.rst     | 173 +++++++++++
 docs/operators-and-hooks-ref.rst                   |   2 +-
 .../providers/google/cloud/hooks/test_datastore.py |   9 +-
 .../google/cloud/operators/test_datastore.py       | 206 +++++++++++++
 .../cloud/operators/test_datastore_system.py       |   4 +
 tests/test_project_structure.py                    |   1 -
 10 files changed, 835 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py b/airflow/providers/google/cloud/example_dags/example_datastore.py
index 43102e1..4129b53 100644
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ b/airflow/providers/google/cloud/example_dags/example_datastore.py
@@ -23,10 +23,13 @@ This example requires that your project contains Datastore instance.
 """
 
 import os
+from typing import Any, Dict
 
 from airflow import models
 from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreAllocateIdsOperator, CloudDatastoreBeginTransactionOperator, CloudDatastoreCommitOperator,
     CloudDatastoreExportEntitiesOperator, CloudDatastoreImportEntitiesOperator,
+    CloudDatastoreRollbackOperator, CloudDatastoreRunQueryOperator,
 )
 from airflow.utils import dates
 
@@ -37,20 +40,118 @@ with models.DAG(
     "example_gcp_datastore",
     schedule_interval=None,  # Override to match your needs
     start_date=dates.days_ago(1),
-    tags=['example'],
+    tags=["example"],
 ) as dag:
+    # [START how_to_export_task]
     export_task = CloudDatastoreExportEntitiesOperator(
         task_id="export_task",
         bucket=BUCKET,
         project_id=GCP_PROJECT_ID,
         overwrite_existing=True,
     )
+    # [END how_to_export_task]
 
+    # [START how_to_import_task]
     import_task = CloudDatastoreImportEntitiesOperator(
         task_id="import_task",
         bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
         file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
-        project_id=GCP_PROJECT_ID
+        project_id=GCP_PROJECT_ID,
     )
+    # [END how_to_import_task]
 
     export_task >> import_task
+
+# [START how_to_keys_def]
+KEYS = [
+    {
+        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+        "path": {"kind": "airflow"},
+    }
+]
+# [END how_to_keys_def]
+
+# [START how_to_transaction_def]
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+# [END how_to_transaction_def]
+
+# [START how_to_commit_def]
+COMMIT_BODY = {
+    "mode": "TRANSACTIONAL",
+    "mutations": [
+        {
+            "insert": {
+                "key": KEYS[0],
+                "properties": {"string": {"stringValue": "airflow is awesome!"}},
+            }
+        }
+    ],
+    "transaction": "{{ task_instance.xcom_pull('begin_transaction_commit') }}",
+}
+# [END how_to_commit_def]
+
+# [START how_to_query_def]
+QUERY = {
+    "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+    "readOptions": {
+        "transaction": "{{ task_instance.xcom_pull('begin_transaction_query') }}"
+    },
+    "query": {},
+}
+# [END how_to_query_def]
+
+with models.DAG(
+    "example_gcp_datastore_operations",
+    start_date=dates.days_ago(1),
+    schedule_interval=None,  # Override to match your needs
+    tags=["example"],
+) as dag2:
+    # [START how_to_allocate_ids]
+    allocate_ids = CloudDatastoreAllocateIdsOperator(
+        task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_allocate_ids]
+
+    # [START how_to_begin_transaction]
+    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_commit",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END how_to_begin_transaction]
+
+    # [START how_to_commit_task]
+    commit_task = CloudDatastoreCommitOperator(
+        task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_commit_task]
+
+    allocate_ids >> begin_transaction_commit >> commit_task
+
+    begin_transaction_query = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_query",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    # [START how_to_run_query]
+    run_query = CloudDatastoreRunQueryOperator(
+        task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_run_query]
+
+    allocate_ids >> begin_transaction_query >> run_query
+
+    begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_to_rollback",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    # [START how_to_rollback_transaction]
+    rollback_transaction = CloudDatastoreRollbackOperator(
+        task_id="rollback_transaction",
+        transaction="{{ task_instance.xcom_pull('begin_transaction_to_rollback') }}",
+    )
+    begin_transaction_to_rollback >> rollback_transaction
+    # [END how_to_rollback_transaction]
diff --git a/airflow/providers/google/cloud/hooks/datastore.py b/airflow/providers/google/cloud/hooks/datastore.py
index 0dcf7f3..92de4bd 100644
--- a/airflow/providers/google/cloud/hooks/datastore.py
+++ b/airflow/providers/google/cloud/hooks/datastore.py
@@ -100,7 +100,7 @@ class DatastoreHook(GoogleBaseHook):
         return resp['keys']
 
     @GoogleBaseHook.fallback_to_default_project_id
-    def begin_transaction(self, project_id: str) -> str:
+    def begin_transaction(self, project_id: str, transaction_options: Dict[str, Any]) -> str:
         """
         Begins a new transaction.
 
@@ -109,6 +109,8 @@ class DatastoreHook(GoogleBaseHook):
 
         :param project_id: Google Cloud Platform project ID against which to make the request.
         :type project_id: str
+        :param transaction_options: Options for a new transaction.
+        :type transaction_options: Dict[str, Any]
         :return: a transaction handle.
         :rtype: str
         """
@@ -116,7 +118,7 @@ class DatastoreHook(GoogleBaseHook):
 
         resp = (conn  # pylint: disable=no-member
                 .projects()
-                .beginTransaction(projectId=project_id, body={})
+                .beginTransaction(projectId=project_id, body={"transactionOptions": transaction_options})
                 .execute(num_retries=self.num_retries))
 
         return resp['transaction']
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index b99e884..cd38392 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -19,7 +19,7 @@
 """
 This module contains Google Datastore operators.
 """
-from typing import Optional
+from typing import Any, Dict, List, Optional
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -32,6 +32,10 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
     """
     Export entities from Google Cloud Datastore to Cloud Storage
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreExportEntitiesOperator`
+
     :param bucket: name of the cloud storage bucket to backup data
     :type bucket: str
     :param namespace: optional namespace path in the specified Cloud Storage bucket
@@ -63,6 +67,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,  # pylint: disable=too-many-arguments
+                 *,
                  bucket: str,
                  namespace: Optional[str] = None,
                  datastore_conn_id: str = 'google_cloud_default',
@@ -118,6 +123,10 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
     """
     Import entities from Cloud Storage to Google Cloud Datastore
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreImportEntitiesOperator`
+
     :param bucket: container in Cloud Storage to store data
     :type bucket: str
     :param file: path of the backup metadata file in the specified Cloud Storage bucket.
@@ -147,6 +156,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
 
     @apply_defaults
     def __init__(self,
+                 *,
                  bucket: str,
                  file: str,
                  namespace: Optional[str] = None,
@@ -189,3 +199,329 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
             raise AirflowException('Operation failed: result={}'.format(result))
 
         return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+    """
+    Allocate IDs for incomplete keys. Return list of keys.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+    :param partial_keys: a list of partial keys.
+    :type partial_keys: list
+    :param project_id: Google Cloud Platform project ID against which to make the request.
+    :type project_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("partial_keys",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        partial_keys: List,
+        project_id: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.partial_keys = partial_keys
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        keys = hook.allocate_ids(
+            partial_keys=self.partial_keys,
+            project_id=self.project_id,
+        )
+        return keys
+
+
+class CloudDatastoreBeginTransactionOperator(BaseOperator):
+    """
+    Begins a new transaction. Returns a transaction handle.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreBeginTransactionOperator`
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction
+
+    :param transaction_options: Options for a new transaction.
+    :type transaction_options: Dict[str, Any]
+    :param project_id: Google Cloud Platform project ID against which to make the request.
+    :type project_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("transaction_options",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        transaction_options: Dict[str, Any],
+        project_id: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.transaction_options = transaction_options
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        handle = hook.begin_transaction(
+            transaction_options=self.transaction_options,
+            project_id=self.project_id,
+        )
+        return handle
+
+
+class CloudDatastoreCommitOperator(BaseOperator):
+    """
+    Commit a transaction, optionally creating, deleting or modifying some entities.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreCommitOperator`
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit
+
+    :param body: the body of the commit request.
+    :type body: dict
+    :param project_id: Google Cloud Platform project ID against which to make the request.
+    :type project_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("body",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        body: Dict[str, Any],
+        project_id: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.body = body
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        response = hook.commit(
+            body=self.body,
+            project_id=self.project_id,
+        )
+        return response
+
+
+class CloudDatastoreRollbackOperator(BaseOperator):
+    """
+    Roll back a transaction.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreRollbackOperator`
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback
+
+    :param transaction: the transaction to roll back.
+    :type transaction: str
+    :param project_id: Google Cloud Platform project ID against which to make the request.
+    :type project_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("transaction",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        transaction: str,
+        project_id: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.transaction = transaction
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        hook.rollback(
+            transaction=self.transaction,
+            project_id=self.project_id,
+        )
+
+
+class CloudDatastoreRunQueryOperator(BaseOperator):
+    """
+    Run a query for entities. Returns the batch of query results.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:CloudDatastoreRunQueryOperator`
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery
+
+    :param body: the body of the query request.
+    :type body: dict
+    :param project_id: Google Cloud Platform project ID against which to make the request.
+    :type project_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("body",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        body: Dict[str, Any],
+        project_id: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.body = body
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        response = hook.run_query(
+            body=self.body,
+            project_id=self.project_id,
+        )
+        return response
+
+
+class CloudDatastoreGetOperationOperator(BaseOperator):
+    """
+    Gets the latest state of a long-running operation.
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects.operations/get
+
+    :param name: the name of the operation resource.
+    :type name: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("name",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        name: str,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.name = name
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        op = hook.get_operation(name=self.name)
+        return op
+
+
+class CloudDatastoreDeleteOperationOperator(BaseOperator):
+    """
+    Deletes the long-running operation.
+
+    .. seealso::
+        https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects.operations/delete
+
+    :param name: the name of the operation resource.
+    :type name: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("name",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        name: str,
+        delegate_to: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.name = name
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+        hook.delete_operation(name=self.name)
diff --git a/docs/build b/docs/build
index d8a93da..257c767 100755
--- a/docs/build
+++ b/docs/build
@@ -355,7 +355,6 @@ MISSING_GOOGLLE_DOC_GUIDES = {
     'bigquery_to_mysql',
     'cassandra_to_gcs',
     'dataflow',
-    'datastore',
     'dlp',
     'gcs_to_bigquery',
     'mssql_to_gcs',
diff --git a/docs/howto/operator/google/cloud/datastore.rst b/docs/howto/operator/google/cloud/datastore.rst
new file mode 100644
index 0000000..a73f426
--- /dev/null
+++ b/docs/howto/operator/google/cloud/datastore.rst
@@ -0,0 +1,173 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Datastore Operators
+================================
+
+Firestore in Datastore mode is a NoSQL document database built for automatic scaling,
+high performance, and ease of application development.
+
+For more information about the service visit
+`Datastore product documentation <https://cloud.google.com/datastore/docs>`__
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:CloudDatastoreExportEntitiesOperator:
+
+Export Entities
+---------------
+
+To export entities from Google Cloud Datastore to Cloud Storage use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_export_task]
+    :end-before: [END how_to_export_task]
+
+.. _howto/operator:CloudDatastoreImportEntitiesOperator:
+
+Import Entities
+---------------
+
+To import entities from Cloud Storage to Google Cloud Datastore use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_import_task]
+    :end-before: [END how_to_import_task]
+
+.. _howto/operator:CloudDatastoreAllocateIdsOperator:
+
+Allocate Ids
+------------
+
+To allocate IDs for incomplete keys use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreAllocateIdsOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_allocate_ids]
+    :end-before: [END how_to_allocate_ids]
+
+An example of a partial keys required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_keys_def]
+    :end-before: [END how_to_keys_def]
+
+.. _howto/operator:CloudDatastoreBeginTransactionOperator:
+
+Begin transaction
+-----------------
+
+To begin a new transaction use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_begin_transaction]
+    :end-before: [END how_to_begin_transaction]
+
+An example of a transaction options required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_transaction_def]
+    :end-before: [END how_to_transaction_def]
+
+.. _howto/operator:CloudDatastoreCommitOperator:
+
+Commit transaction
+------------------
+
+To commit a transaction, optionally creating, deleting or modifying some entities
+use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCommitOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_commit_task]
+    :end-before: [END how_to_commit_task]
+
+An example of a commit information required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_commit_def]
+    :end-before: [END how_to_commit_def]
+
+.. _howto/operator:CloudDatastoreRunQueryOperator:
+
+Run query
+---------
+
+To run a query for entities use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_run_query]
+    :end-before: [END how_to_run_query]
+
+An example of a query required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_query_def]
+    :end-before: [END how_to_query_def]
+
+.. _howto/operator:CloudDatastoreRollbackOperator:
+
+Roll back transaction
+---------------------
+
+To roll back a transaction
+use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_rollback_transaction]
+    :end-before: [END how_to_rollback_transaction]
+
+
+References
+^^^^^^^^^^
+For further information, take a look at:
+
+* `Datastore API documentation <https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects>`__
+* `Product documentation <https://cloud.google.com/datastore/docs>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index a6ecb53..14bee04 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -726,7 +726,7 @@ These integrations allow you to perform various operations within the Google Clo
      -
 
    * - `Datastore <https://cloud.google.com/datastore/>`__
-     -
+     - :doc:`How to use <howto/operator/google/cloud/datastore>`
      - :mod:`airflow.providers.google.cloud.hooks.datastore`
      - :mod:`airflow.providers.google.cloud.operators.datastore`
      -
diff --git a/tests/providers/google/cloud/hooks/test_datastore.py b/tests/providers/google/cloud/hooks/test_datastore.py
index 93dd663..51591a8 100644
--- a/tests/providers/google/cloud/hooks/test_datastore.py
+++ b/tests/providers/google/cloud/hooks/test_datastore.py
@@ -86,12 +86,17 @@ class TestDatastoreHook(unittest.TestCase):
     def test_begin_transaction(self, mock_get_conn):
         self.datastore_hook.connection = mock_get_conn.return_value
 
-        transaction = self.datastore_hook.begin_transaction(project_id=GCP_PROJECT_ID)
+        transaction = self.datastore_hook.begin_transaction(
+            project_id=GCP_PROJECT_ID,
+            transaction_options={},
+        )
 
         projects = self.datastore_hook.connection.projects
         projects.assert_called_once_with()
         begin_transaction = projects.return_value.beginTransaction
-        begin_transaction.assert_called_once_with(projectId=GCP_PROJECT_ID, body={})
+        begin_transaction.assert_called_once_with(
+            projectId=GCP_PROJECT_ID, body={'transactionOptions': {}}
+        )
         execute = begin_transaction.return_value.execute
         execute.assert_called_once_with(num_retries=mock.ANY)
         self.assertEqual(transaction, execute.return_value['transaction'])
diff --git a/tests/providers/google/cloud/operators/test_datastore.py b/tests/providers/google/cloud/operators/test_datastore.py
new file mode 100644
index 0000000..c097e58
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_datastore.py
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreAllocateIdsOperator, CloudDatastoreBeginTransactionOperator, CloudDatastoreCommitOperator,
+    CloudDatastoreDeleteOperationOperator, CloudDatastoreExportEntitiesOperator,
+    CloudDatastoreGetOperationOperator, CloudDatastoreImportEntitiesOperator, CloudDatastoreRollbackOperator,
+    CloudDatastoreRunQueryOperator,
+)
+
+HOOK_PATH = "airflow.providers.google.cloud.operators.datastore.DatastoreHook"
+PROJECT_ID = "test-project"
+CONN_ID = "test-gcp-conn-id"
+BODY = {"key", "value"}
+TRANSACTION = "transaction-name"
+BUCKET = "gs://test-bucket"
+FILE = "filename"
+OPERATION_ID = "1234"
+
+
+class TestCloudDatastoreExportEntitiesOperator:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        mock_hook.return_value.export_to_storage_bucket.return_value = {
+            "name": OPERATION_ID
+        }
+        mock_hook.return_value.poll_operation_until_done.return_value = {
+            "metadata": {"common": {"state": "SUCCESSFUL"}}
+        }
+
+        op = CloudDatastoreExportEntitiesOperator(
+            task_id="test_task",
+            datastore_conn_id=CONN_ID,
+            project_id=PROJECT_ID,
+            bucket=BUCKET,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(CONN_ID, None)
+        mock_hook.return_value.export_to_storage_bucket.assert_called_once_with(
+            project_id=PROJECT_ID,
+            bucket=BUCKET,
+            entity_filter=None,
+            labels=None,
+            namespace=None,
+        )
+
+        mock_hook.return_value.poll_operation_until_done.assert_called_once_with(
+            OPERATION_ID, 10
+        )
+
+
+class TestCloudDatastoreImportEntitiesOperator:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        mock_hook.return_value.import_from_storage_bucket.return_value = {
+            "name": OPERATION_ID
+        }
+        mock_hook.return_value.poll_operation_until_done.return_value = {
+            "metadata": {"common": {"state": "SUCCESSFUL"}}
+        }
+
+        op = CloudDatastoreImportEntitiesOperator(
+            task_id="test_task",
+            datastore_conn_id=CONN_ID,
+            project_id=PROJECT_ID,
+            bucket=BUCKET,
+            file=FILE,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(CONN_ID, None)
+        mock_hook.return_value.import_from_storage_bucket.assert_called_once_with(
+            project_id=PROJECT_ID,
+            bucket=BUCKET,
+            file=FILE,
+            entity_filter=None,
+            labels=None,
+            namespace=None,
+        )
+
+        mock_hook.return_value.export_to_storage_bucketassert_called_once_with(
+            OPERATION_ID, 10
+        )
+
+
+class TestCloudDatastoreAllocateIds:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        partial_keys = [1, 2, 3]
+        op = CloudDatastoreAllocateIdsOperator(
+            task_id="test_task",
+            gcp_conn_id=CONN_ID,
+            project_id=PROJECT_ID,
+            partial_keys=partial_keys,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.allocate_ids.assert_called_once_with(
+            project_id=PROJECT_ID, partial_keys=partial_keys
+        )
+
+
+class TestCloudDatastoreBeginTransaction:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreBeginTransactionOperator(
+            task_id="test_task",
+            gcp_conn_id=CONN_ID,
+            project_id=PROJECT_ID,
+            transaction_options=BODY,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.begin_transaction.assert_called_once_with(
+            project_id=PROJECT_ID, transaction_options=BODY
+        )
+
+
+class TestCloudDatastoreCommit:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreCommitOperator(
+            task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.commit.assert_called_once_with(
+            project_id=PROJECT_ID, body=BODY
+        )
+
+
+class TestCloudDatastoreDeleteOperation:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreDeleteOperationOperator(
+            task_id="test_task", gcp_conn_id=CONN_ID, name=TRANSACTION
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.delete_operation.assert_called_once_with(
+            name=TRANSACTION
+        )
+
+
+class TestCloudDatastoreGetOperation:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreGetOperationOperator(
+            task_id="test_task", gcp_conn_id=CONN_ID, name=TRANSACTION
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.get_operation.assert_called_once_with(name=TRANSACTION)
+
+
+class TestCloudDatastoreRollback:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreRollbackOperator(
+            task_id="test_task",
+            gcp_conn_id=CONN_ID,
+            project_id=PROJECT_ID,
+            transaction=TRANSACTION,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.rollback.assert_called_once_with(
+            project_id=PROJECT_ID, transaction=TRANSACTION
+        )
+
+
+class TestCloudDatastoreRunQuery:
+    @mock.patch(HOOK_PATH)
+    def test_execute(self, mock_hook):
+        op = CloudDatastoreRunQueryOperator(
+            task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+        mock_hook.return_value.run_query.assert_called_once_with(
+            project_id=PROJECT_ID, body=BODY
+        )
diff --git a/tests/providers/google/cloud/operators/test_datastore_system.py b/tests/providers/google/cloud/operators/test_datastore_system.py
index 961ba88..7cbeaf1 100644
--- a/tests/providers/google/cloud/operators/test_datastore_system.py
+++ b/tests/providers/google/cloud/operators/test_datastore_system.py
@@ -42,3 +42,7 @@ class GcpDatastoreSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATASTORE_KEY)
     def test_run_example_dag(self):
         self.run_dag('example_gcp_datastore', CLOUD_DAG_FOLDER)
+
+    @provide_gcp_context(GCP_DATASTORE_KEY)
+    def test_run_example_dag_operations(self):
+        self.run_dag('example_gcp_datastore_operations', CLOUD_DAG_FOLDER)
diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py
index 120a018..e30cca5 100644
--- a/tests/test_project_structure.py
+++ b/tests/test_project_structure.py
@@ -29,7 +29,6 @@ ROOT_FOLDER = os.path.realpath(
 
 MISSING_TEST_FILES = {
     'tests/providers/google/cloud/log/test_gcs_task_handler.py',
-    'tests/providers/google/cloud/operators/test_datastore.py',
     'tests/providers/microsoft/azure/sensors/test_azure_cosmos.py',
     'tests/providers/microsoft/azure/log/test_wasb_task_handler.py',
 }