You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/08/03 10:39:37 UTC
[airflow] branch master updated: Add additional Cloud Datastore
operators (#10032)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 6efa1b9 Add additional Cloud Datastore operators (#10032)
6efa1b9 is described below
commit 6efa1b9cb763ae0bdbc884a54d24dbdc39d9e3a6
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Mon Aug 3 12:39:05 2020 +0200
Add additional Cloud Datastore operators (#10032)
This PR adds more operators for Google Cloud Datastore
service. It also adds missing tests and how-to guides.
---
.../google/cloud/example_dags/example_datastore.py | 105 ++++++-
airflow/providers/google/cloud/hooks/datastore.py | 6 +-
.../providers/google/cloud/operators/datastore.py | 338 ++++++++++++++++++++-
docs/build | 1 -
docs/howto/operator/google/cloud/datastore.rst | 173 +++++++++++
docs/operators-and-hooks-ref.rst | 2 +-
.../providers/google/cloud/hooks/test_datastore.py | 9 +-
.../google/cloud/operators/test_datastore.py | 206 +++++++++++++
.../cloud/operators/test_datastore_system.py | 4 +
tests/test_project_structure.py | 1 -
10 files changed, 835 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py b/airflow/providers/google/cloud/example_dags/example_datastore.py
index 43102e1..4129b53 100644
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ b/airflow/providers/google/cloud/example_dags/example_datastore.py
@@ -23,10 +23,13 @@ This example requires that your project contains Datastore instance.
"""
import os
+from typing import Any, Dict
from airflow import models
from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreAllocateIdsOperator, CloudDatastoreBeginTransactionOperator, CloudDatastoreCommitOperator,
CloudDatastoreExportEntitiesOperator, CloudDatastoreImportEntitiesOperator,
+ CloudDatastoreRollbackOperator, CloudDatastoreRunQueryOperator,
)
from airflow.utils import dates
@@ -37,20 +40,118 @@ with models.DAG(
"example_gcp_datastore",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
- tags=['example'],
+ tags=["example"],
) as dag:
+ # [START how_to_export_task]
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET,
project_id=GCP_PROJECT_ID,
overwrite_existing=True,
)
+ # [END how_to_export_task]
+ # [START how_to_import_task]
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
- project_id=GCP_PROJECT_ID
+ project_id=GCP_PROJECT_ID,
)
+ # [END how_to_import_task]
export_task >> import_task
+
+# [START how_to_keys_def]
+KEYS = [
+ {
+ "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+ "path": {"kind": "airflow"},
+ }
+]
+# [END how_to_keys_def]
+
+# [START how_to_transaction_def]
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+# [END how_to_transaction_def]
+
+# [START how_to_commit_def]
+COMMIT_BODY = {
+ "mode": "TRANSACTIONAL",
+ "mutations": [
+ {
+ "insert": {
+ "key": KEYS[0],
+ "properties": {"string": {"stringValue": "airflow is awesome!"}},
+ }
+ }
+ ],
+ "transaction": "{{ task_instance.xcom_pull('begin_transaction_commit') }}",
+}
+# [END how_to_commit_def]
+
+# [START how_to_query_def]
+QUERY = {
+ "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+ "readOptions": {
+ "transaction": "{{ task_instance.xcom_pull('begin_transaction_query') }}"
+ },
+ "query": {},
+}
+# [END how_to_query_def]
+
+with models.DAG(
+ "example_gcp_datastore_operations",
+ start_date=dates.days_ago(1),
+ schedule_interval=None, # Override to match your needs
+ tags=["example"],
+) as dag2:
+ # [START how_to_allocate_ids]
+ allocate_ids = CloudDatastoreAllocateIdsOperator(
+ task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
+ )
+ # [END how_to_allocate_ids]
+
+ # [START how_to_begin_transaction]
+ begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_commit",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=GCP_PROJECT_ID,
+ )
+ # [END how_to_begin_transaction]
+
+ # [START how_to_commit_task]
+ commit_task = CloudDatastoreCommitOperator(
+ task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
+ )
+ # [END how_to_commit_task]
+
+ allocate_ids >> begin_transaction_commit >> commit_task
+
+ begin_transaction_query = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_query",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=GCP_PROJECT_ID,
+ )
+
+ # [START how_to_run_query]
+ run_query = CloudDatastoreRunQueryOperator(
+ task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID
+ )
+ # [END how_to_run_query]
+
+ allocate_ids >> begin_transaction_query >> run_query
+
+ begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_to_rollback",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=GCP_PROJECT_ID,
+ )
+
+ # [START how_to_rollback_transaction]
+ rollback_transaction = CloudDatastoreRollbackOperator(
+ task_id="rollback_transaction",
+ transaction="{{ task_instance.xcom_pull('begin_transaction_to_rollback') }}",
+ )
+ begin_transaction_to_rollback >> rollback_transaction
+ # [END how_to_rollback_transaction]
diff --git a/airflow/providers/google/cloud/hooks/datastore.py b/airflow/providers/google/cloud/hooks/datastore.py
index 0dcf7f3..92de4bd 100644
--- a/airflow/providers/google/cloud/hooks/datastore.py
+++ b/airflow/providers/google/cloud/hooks/datastore.py
@@ -100,7 +100,7 @@ class DatastoreHook(GoogleBaseHook):
return resp['keys']
@GoogleBaseHook.fallback_to_default_project_id
- def begin_transaction(self, project_id: str) -> str:
+ def begin_transaction(self, project_id: str, transaction_options: Dict[str, Any]) -> str:
"""
Begins a new transaction.
@@ -109,6 +109,8 @@ class DatastoreHook(GoogleBaseHook):
:param project_id: Google Cloud Platform project ID against which to make the request.
:type project_id: str
+ :param transaction_options: Options for a new transaction.
+ :type transaction_options: Dict[str, Any]
:return: a transaction handle.
:rtype: str
"""
@@ -116,7 +118,7 @@ class DatastoreHook(GoogleBaseHook):
resp = (conn # pylint: disable=no-member
.projects()
- .beginTransaction(projectId=project_id, body={})
+ .beginTransaction(projectId=project_id, body={"transactionOptions": transaction_options})
.execute(num_retries=self.num_retries))
return resp['transaction']
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index b99e884..cd38392 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -19,7 +19,7 @@
"""
This module contains Google Datastore operators.
"""
-from typing import Optional
+from typing import Any, Dict, List, Optional
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -32,6 +32,10 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
"""
Export entities from Google Cloud Datastore to Cloud Storage
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreExportEntitiesOperator`
+
:param bucket: name of the cloud storage bucket to backup data
:type bucket: str
:param namespace: optional namespace path in the specified Cloud Storage bucket
@@ -63,6 +67,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments
+ *,
bucket: str,
namespace: Optional[str] = None,
datastore_conn_id: str = 'google_cloud_default',
@@ -118,6 +123,10 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
"""
Import entities from Cloud Storage to Google Cloud Datastore
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreImportEntitiesOperator`
+
:param bucket: container in Cloud Storage to store data
:type bucket: str
:param file: path of the backup metadata file in the specified Cloud Storage bucket.
@@ -147,6 +156,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
@apply_defaults
def __init__(self,
+ *,
bucket: str,
file: str,
namespace: Optional[str] = None,
@@ -189,3 +199,329 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
raise AirflowException('Operation failed: result={}'.format(result))
return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+ """
+ Allocate IDs for incomplete keys. Return list of keys.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+ :param partial_keys: a list of partial keys.
+ :type partial_keys: list
+ :param project_id: Google Cloud Platform project ID against which to make the request.
+ :type project_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("partial_keys",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ partial_keys: List,
+ project_id: Optional[str] = None,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.partial_keys = partial_keys
+ self.gcp_conn_id = gcp_conn_id
+ self.project_id = project_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ keys = hook.allocate_ids(
+ partial_keys=self.partial_keys,
+ project_id=self.project_id,
+ )
+ return keys
+
+
+class CloudDatastoreBeginTransactionOperator(BaseOperator):
+ """
+ Begins a new transaction. Returns a transaction handle.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreBeginTransactionOperator`
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction
+
+ :param transaction_options: Options for a new transaction.
+ :type transaction_options: Dict[str, Any]
+ :param project_id: Google Cloud Platform project ID against which to make the request.
+ :type project_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("transaction_options",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ transaction_options: Dict[str, Any],
+ project_id: Optional[str] = None,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.transaction_options = transaction_options
+ self.gcp_conn_id = gcp_conn_id
+ self.project_id = project_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ handle = hook.begin_transaction(
+ transaction_options=self.transaction_options,
+ project_id=self.project_id,
+ )
+ return handle
+
+
+class CloudDatastoreCommitOperator(BaseOperator):
+ """
+ Commit a transaction, optionally creating, deleting or modifying some entities.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreCommitOperator`
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit
+
+ :param body: the body of the commit request.
+ :type body: dict
+ :param project_id: Google Cloud Platform project ID against which to make the request.
+ :type project_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("body",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ body: Dict[str, Any],
+ project_id: Optional[str] = None,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.body = body
+ self.gcp_conn_id = gcp_conn_id
+ self.project_id = project_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ response = hook.commit(
+ body=self.body,
+ project_id=self.project_id,
+ )
+ return response
+
+
+class CloudDatastoreRollbackOperator(BaseOperator):
+ """
+ Roll back a transaction.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreRollbackOperator`
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback
+
+ :param transaction: the transaction to roll back.
+ :type transaction: str
+ :param project_id: Google Cloud Platform project ID against which to make the request.
+ :type project_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("transaction",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ transaction: str,
+ project_id: Optional[str] = None,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.transaction = transaction
+ self.gcp_conn_id = gcp_conn_id
+ self.project_id = project_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ hook.rollback(
+ transaction=self.transaction,
+ project_id=self.project_id,
+ )
+
+
+class CloudDatastoreRunQueryOperator(BaseOperator):
+ """
+ Run a query for entities. Returns the batch of query results.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:CloudDatastoreRunQueryOperator`
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery
+
+ :param body: the body of the query request.
+ :type body: dict
+ :param project_id: Google Cloud Platform project ID against which to make the request.
+ :type project_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("body",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ body: Dict[str, Any],
+ project_id: Optional[str] = None,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.body = body
+ self.gcp_conn_id = gcp_conn_id
+ self.project_id = project_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ response = hook.run_query(
+ body=self.body,
+ project_id=self.project_id,
+ )
+ return response
+
+
+class CloudDatastoreGetOperationOperator(BaseOperator):
+ """
+ Gets the latest state of a long-running operation.
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects.operations/get
+
+ :param name: the name of the operation resource.
+ :type name: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("name",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ name: str,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.name = name
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ op = hook.get_operation(name=self.name)
+ return op
+
+
+class CloudDatastoreDeleteOperationOperator(BaseOperator):
+ """
+ Deletes the long-running operation.
+
+ .. seealso::
+ https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects.operations/delete
+
+ :param name: the name of the operation resource.
+ :type name: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+ template_fields = ("name",)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ name: str,
+ delegate_to: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.name = name
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)
+ hook.delete_operation(name=self.name)
diff --git a/docs/build b/docs/build
index d8a93da..257c767 100755
--- a/docs/build
+++ b/docs/build
@@ -355,7 +355,6 @@ MISSING_GOOGLLE_DOC_GUIDES = {
'bigquery_to_mysql',
'cassandra_to_gcs',
'dataflow',
- 'datastore',
'dlp',
'gcs_to_bigquery',
'mssql_to_gcs',
diff --git a/docs/howto/operator/google/cloud/datastore.rst b/docs/howto/operator/google/cloud/datastore.rst
new file mode 100644
index 0000000..a73f426
--- /dev/null
+++ b/docs/howto/operator/google/cloud/datastore.rst
@@ -0,0 +1,173 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Datastore Operators
+================================
+
+Firestore in Datastore mode is a NoSQL document database built for automatic scaling,
+high performance, and ease of application development.
+
+For more information about the service visit
+`Datastore product documentation <https://cloud.google.com/datastore/docs>`__
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite Tasks
+------------------
+
+.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:CloudDatastoreExportEntitiesOperator:
+
+Export Entities
+---------------
+
+To export entities from Google Cloud Datastore to Cloud Storage use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_export_task]
+ :end-before: [END how_to_export_task]
+
+.. _howto/operator:CloudDatastoreImportEntitiesOperator:
+
+Import Entities
+---------------
+
+To import entities from Cloud Storage to Google Cloud Datastore use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_import_task]
+ :end-before: [END how_to_import_task]
+
+.. _howto/operator:CloudDatastoreAllocateIdsOperator:
+
+Allocate Ids
+------------
+
+To allocate IDs for incomplete keys use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreAllocateIdsOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_allocate_ids]
+ :end-before: [END how_to_allocate_ids]
+
+An example of a partial keys required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_keys_def]
+ :end-before: [END how_to_keys_def]
+
+.. _howto/operator:CloudDatastoreBeginTransactionOperator:
+
+Begin transaction
+-----------------
+
+To begin a new transaction use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_begin_transaction]
+ :end-before: [END how_to_begin_transaction]
+
+An example of a transaction options required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_transaction_def]
+ :end-before: [END how_to_transaction_def]
+
+.. _howto/operator:CloudDatastoreCommitOperator:
+
+Commit transaction
+------------------
+
+To commit a transaction, optionally creating, deleting or modifying some entities
+use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCommitOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_commit_task]
+ :end-before: [END how_to_commit_task]
+
+An example of a commit information required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_commit_def]
+ :end-before: [END how_to_commit_def]
+
+.. _howto/operator:CloudDatastoreRunQueryOperator:
+
+Run query
+---------
+
+To run a query for entities use
+:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_run_query]
+ :end-before: [END how_to_run_query]
+
+An example of a query required by the operator:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_query_def]
+ :end-before: [END how_to_query_def]
+
+.. _howto/operator:CloudDatastoreRollbackOperator:
+
+Roll back transaction
+---------------------
+
+To roll back a transaction
+use :class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator`
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_datastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_rollback_transaction]
+ :end-before: [END how_to_rollback_transaction]
+
+
+References
+^^^^^^^^^^
+For further information, take a look at:
+
+* `Datastore API documentation <https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects>`__
+* `Product documentation <https://cloud.google.com/datastore/docs>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index a6ecb53..14bee04 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -726,7 +726,7 @@ These integrations allow you to perform various operations within the Google Clo
-
* - `Datastore <https://cloud.google.com/datastore/>`__
- -
+ - :doc:`How to use <howto/operator/google/cloud/datastore>`
- :mod:`airflow.providers.google.cloud.hooks.datastore`
- :mod:`airflow.providers.google.cloud.operators.datastore`
-
diff --git a/tests/providers/google/cloud/hooks/test_datastore.py b/tests/providers/google/cloud/hooks/test_datastore.py
index 93dd663..51591a8 100644
--- a/tests/providers/google/cloud/hooks/test_datastore.py
+++ b/tests/providers/google/cloud/hooks/test_datastore.py
@@ -86,12 +86,17 @@ class TestDatastoreHook(unittest.TestCase):
def test_begin_transaction(self, mock_get_conn):
self.datastore_hook.connection = mock_get_conn.return_value
- transaction = self.datastore_hook.begin_transaction(project_id=GCP_PROJECT_ID)
+ transaction = self.datastore_hook.begin_transaction(
+ project_id=GCP_PROJECT_ID,
+ transaction_options={},
+ )
projects = self.datastore_hook.connection.projects
projects.assert_called_once_with()
begin_transaction = projects.return_value.beginTransaction
- begin_transaction.assert_called_once_with(projectId=GCP_PROJECT_ID, body={})
+ begin_transaction.assert_called_once_with(
+ projectId=GCP_PROJECT_ID, body={'transactionOptions': {}}
+ )
execute = begin_transaction.return_value.execute
execute.assert_called_once_with(num_retries=mock.ANY)
self.assertEqual(transaction, execute.return_value['transaction'])
diff --git a/tests/providers/google/cloud/operators/test_datastore.py b/tests/providers/google/cloud/operators/test_datastore.py
new file mode 100644
index 0000000..c097e58
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_datastore.py
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreAllocateIdsOperator, CloudDatastoreBeginTransactionOperator, CloudDatastoreCommitOperator,
+ CloudDatastoreDeleteOperationOperator, CloudDatastoreExportEntitiesOperator,
+ CloudDatastoreGetOperationOperator, CloudDatastoreImportEntitiesOperator, CloudDatastoreRollbackOperator,
+ CloudDatastoreRunQueryOperator,
+)
+
+HOOK_PATH = "airflow.providers.google.cloud.operators.datastore.DatastoreHook"
+PROJECT_ID = "test-project"
+CONN_ID = "test-gcp-conn-id"
+BODY = {"key", "value"}
+TRANSACTION = "transaction-name"
+BUCKET = "gs://test-bucket"
+FILE = "filename"
+OPERATION_ID = "1234"
+
+
+class TestCloudDatastoreExportEntitiesOperator:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ mock_hook.return_value.export_to_storage_bucket.return_value = {
+ "name": OPERATION_ID
+ }
+ mock_hook.return_value.poll_operation_until_done.return_value = {
+ "metadata": {"common": {"state": "SUCCESSFUL"}}
+ }
+
+ op = CloudDatastoreExportEntitiesOperator(
+ task_id="test_task",
+ datastore_conn_id=CONN_ID,
+ project_id=PROJECT_ID,
+ bucket=BUCKET,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(CONN_ID, None)
+ mock_hook.return_value.export_to_storage_bucket.assert_called_once_with(
+ project_id=PROJECT_ID,
+ bucket=BUCKET,
+ entity_filter=None,
+ labels=None,
+ namespace=None,
+ )
+
+ mock_hook.return_value.poll_operation_until_done.assert_called_once_with(
+ OPERATION_ID, 10
+ )
+
+
+class TestCloudDatastoreImportEntitiesOperator:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ mock_hook.return_value.import_from_storage_bucket.return_value = {
+ "name": OPERATION_ID
+ }
+ mock_hook.return_value.poll_operation_until_done.return_value = {
+ "metadata": {"common": {"state": "SUCCESSFUL"}}
+ }
+
+ op = CloudDatastoreImportEntitiesOperator(
+ task_id="test_task",
+ datastore_conn_id=CONN_ID,
+ project_id=PROJECT_ID,
+ bucket=BUCKET,
+ file=FILE,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(CONN_ID, None)
+ mock_hook.return_value.import_from_storage_bucket.assert_called_once_with(
+ project_id=PROJECT_ID,
+ bucket=BUCKET,
+ file=FILE,
+ entity_filter=None,
+ labels=None,
+ namespace=None,
+ )
+
+ mock_hook.return_value.export_to_storage_bucketassert_called_once_with(
+ OPERATION_ID, 10
+ )
+
+
+class TestCloudDatastoreAllocateIds:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ partial_keys = [1, 2, 3]
+ op = CloudDatastoreAllocateIdsOperator(
+ task_id="test_task",
+ gcp_conn_id=CONN_ID,
+ project_id=PROJECT_ID,
+ partial_keys=partial_keys,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.allocate_ids.assert_called_once_with(
+ project_id=PROJECT_ID, partial_keys=partial_keys
+ )
+
+
+class TestCloudDatastoreBeginTransaction:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreBeginTransactionOperator(
+ task_id="test_task",
+ gcp_conn_id=CONN_ID,
+ project_id=PROJECT_ID,
+ transaction_options=BODY,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.begin_transaction.assert_called_once_with(
+ project_id=PROJECT_ID, transaction_options=BODY
+ )
+
+
+class TestCloudDatastoreCommit:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreCommitOperator(
+ task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.commit.assert_called_once_with(
+ project_id=PROJECT_ID, body=BODY
+ )
+
+
+class TestCloudDatastoreDeleteOperation:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreDeleteOperationOperator(
+ task_id="test_task", gcp_conn_id=CONN_ID, name=TRANSACTION
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.delete_operation.assert_called_once_with(
+ name=TRANSACTION
+ )
+
+
+class TestCloudDatastoreGetOperation:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreGetOperationOperator(
+ task_id="test_task", gcp_conn_id=CONN_ID, name=TRANSACTION
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.get_operation.assert_called_once_with(name=TRANSACTION)
+
+
+class TestCloudDatastoreRollback:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreRollbackOperator(
+ task_id="test_task",
+ gcp_conn_id=CONN_ID,
+ project_id=PROJECT_ID,
+ transaction=TRANSACTION,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.rollback.assert_called_once_with(
+ project_id=PROJECT_ID, transaction=TRANSACTION
+ )
+
+
+class TestCloudDatastoreRunQuery:
+ @mock.patch(HOOK_PATH)
+ def test_execute(self, mock_hook):
+ op = CloudDatastoreRunQueryOperator(
+ task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID)
+ mock_hook.return_value.run_query.assert_called_once_with(
+ project_id=PROJECT_ID, body=BODY
+ )
diff --git a/tests/providers/google/cloud/operators/test_datastore_system.py b/tests/providers/google/cloud/operators/test_datastore_system.py
index 961ba88..7cbeaf1 100644
--- a/tests/providers/google/cloud/operators/test_datastore_system.py
+++ b/tests/providers/google/cloud/operators/test_datastore_system.py
@@ -42,3 +42,7 @@ class GcpDatastoreSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_DATASTORE_KEY)
def test_run_example_dag(self):
self.run_dag('example_gcp_datastore', CLOUD_DAG_FOLDER)
+
+ @provide_gcp_context(GCP_DATASTORE_KEY)
+ def test_run_example_dag_operations(self):
+ self.run_dag('example_gcp_datastore_operations', CLOUD_DAG_FOLDER)
diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py
index 120a018..e30cca5 100644
--- a/tests/test_project_structure.py
+++ b/tests/test_project_structure.py
@@ -29,7 +29,6 @@ ROOT_FOLDER = os.path.realpath(
MISSING_TEST_FILES = {
'tests/providers/google/cloud/log/test_gcs_task_handler.py',
- 'tests/providers/google/cloud/operators/test_datastore.py',
'tests/providers/microsoft/azure/sensors/test_azure_cosmos.py',
'tests/providers/microsoft/azure/log/test_wasb_task_handler.py',
}