You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:36:33 UTC
[airflow] branch main updated (8cfb2be989 -> d9e7b6a940)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
from 8cfb2be989 Add doc and example dag for AWS Step Functions Operators
new 43ded6c877 Add links for Cloud Datastore operators
new b3cc2f5d10 Fix pre-commit check
new 544d658921 Change CloudDatastoreExportEntitiesLink to StorageLink
new d9e7b6a940 Update unit tests for Datastore operators
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../cloud/links/{dataplex.py => datastore.py} | 41 ++++++++++------------
.../providers/google/cloud/operators/datastore.py | 17 +++++++++
airflow/providers/google/provider.yaml | 2 ++
.../google/cloud/operators/test_datastore.py | 14 +++++---
4 files changed, 47 insertions(+), 27 deletions(-)
copy airflow/providers/google/cloud/links/{dataplex.py => datastore.py} (58%)
[airflow] 02/04: Fix pre-commit check
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b3cc2f5d102214067c40b3c120364918556e7cca
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 14:28:44 2022 +0200
Fix pre-commit check
---
airflow/providers/google/cloud/links/datastore.py | 2 +-
airflow/providers/google/cloud/operators/datastore.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
index c8066f0246..0416896edd 100644
--- a/airflow/providers/google/cloud/links/datastore.py
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -94,4 +94,4 @@ class CloudDatastoreEntitiesLink(BaseGoogleLink):
value={
"project_id": task_instance.project_id,
},
- )
\ No newline at end of file
+ )
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 4f24ba4420..a654e3126c 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -24,9 +24,9 @@ from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.datastore import (
+ CloudDatastoreEntitiesLink,
CloudDatastoreExportEntitiesLink,
CloudDatastoreImportExportLink,
- CloudDatastoreEntitiesLink,
)
if TYPE_CHECKING:
[airflow] 03/04: Change CloudDatastoreExportEntitiesLink to StorageLink
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 544d658921cb2c9ac1abc5a89f2e275255cb924b
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 15:49:04 2022 +0200
Change CloudDatastoreExportEntitiesLink to StorageLink
---
airflow/providers/google/cloud/links/datastore.py | 24 ----------------------
.../providers/google/cloud/operators/datastore.py | 8 ++++----
airflow/providers/google/provider.yaml | 1 -
3 files changed, 4 insertions(+), 29 deletions(-)
diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
index 0416896edd..d17a6a8ef0 100644
--- a/airflow/providers/google/cloud/links/datastore.py
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -52,30 +52,6 @@ class CloudDatastoreImportExportLink(BaseGoogleLink):
)
-class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
- """Helper class for constructing Cloud Datastore Export Entities Link"""
-
- name = "Export Entities"
- key = "export_conf"
- format_str = DATASTORE_EXPORT_ENTITIES_LINK
-
- @staticmethod
- def persist(
- context: "Context",
- task_instance,
- output_url: str,
- ):
- task_instance.xcom_push(
- context=context,
- key=CloudDatastoreExportEntitiesLink.key,
- value={
- "project_id": task_instance.project_id,
- "bucket_name": task_instance.bucket,
- "export_name": output_url.split('/')[3],
- },
- )
-
-
class CloudDatastoreEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Entities Link"""
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index a654e3126c..4d168cb2e1 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -25,9 +25,9 @@ from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.datastore import (
CloudDatastoreEntitiesLink,
- CloudDatastoreExportEntitiesLink,
CloudDatastoreImportExportLink,
)
+from airflow.providers.google.common.links.storage import StorageLink
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -78,7 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
- operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
+ operator_extra_links = (StorageLink(),)
def __init__(
self,
@@ -138,10 +138,10 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
- CloudDatastoreExportEntitiesLink.persist(
+ StorageLink.persist(
context=context,
task_instance=self,
- output_url=result['response']['outputUrl'],
+ uri=f"{self.bucket}/{result['response']['outputUrl'].split('/')[3]}",
)
return result
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 4ef112ead9..b1ef488269 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -905,7 +905,6 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
- - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink
[airflow] 01/04: Add links for Cloud Datastore operators
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 43ded6c877e90f53e4f5849499a6ed9585b309b7
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 13:36:38 2022 +0200
Add links for Cloud Datastore operators
---
airflow/providers/google/cloud/links/datastore.py | 97 ++++++++++++++++++++++
.../providers/google/cloud/operators/datastore.py | 17 ++++
airflow/providers/google/provider.yaml | 3 +
3 files changed, 117 insertions(+)
diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
new file mode 100644
index 0000000000..c8066f0246
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+DATASTORE_BASE_LINK = BASE_LINK + "/datastore"
+DATASTORE_IMPORT_EXPORT_LINK = DATASTORE_BASE_LINK + "/import-export?project={project_id}"
+DATASTORE_EXPORT_ENTITIES_LINK = (
+ BASE_LINK + "/storage/browser/{bucket_name}/{export_name}?project={project_id}"
+)
+DATASTORE_ENTITIES_LINK = DATASTORE_BASE_LINK + "/entities/query/kind?project={project_id}"
+
+
+class CloudDatastoreImportExportLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Import/Export Link"""
+
+ name = "Import/Export Page"
+ key = "import_export_conf"
+ format_str = DATASTORE_IMPORT_EXPORT_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreImportExportLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Export Entities Link"""
+
+ name = "Export Entities"
+ key = "export_conf"
+ format_str = DATASTORE_EXPORT_ENTITIES_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ output_url: str,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreExportEntitiesLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ "bucket_name": task_instance.bucket,
+ "export_name": output_url.split('/')[3],
+ },
+ )
+
+
+class CloudDatastoreEntitiesLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Entities Link"""
+
+ name = "Entities"
+ key = "entities_conf"
+ format_str = DATASTORE_ENTITIES_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreEntitiesLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ },
+ )
\ No newline at end of file
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 8282057518..4f24ba4420 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -23,6 +23,11 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.cloud.links.datastore import (
+ CloudDatastoreExportEntitiesLink,
+ CloudDatastoreImportExportLink,
+ CloudDatastoreEntitiesLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -73,6 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
+ operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
def __init__(
self,
@@ -132,6 +138,11 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
+ CloudDatastoreExportEntitiesLink.persist(
+ context=context,
+ task_instance=self,
+ output_url=result['response']['outputUrl'],
+ )
return result
@@ -179,6 +190,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
+ operator_extra_links = (CloudDatastoreImportExportLink(),)
def __init__(
self,
@@ -231,6 +243,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
+ CloudDatastoreImportExportLink.persist(context=context, task_instance=self)
return result
@@ -265,6 +278,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
"partial_keys",
"impersonation_chain",
)
+ operator_extra_links = (CloudDatastoreEntitiesLink(),)
def __init__(
self,
@@ -293,6 +307,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
partial_keys=self.partial_keys,
project_id=self.project_id,
)
+ CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return keys
@@ -389,6 +404,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
"body",
"impersonation_chain",
)
+ operator_extra_links = (CloudDatastoreEntitiesLink(),)
def __init__(
self,
@@ -417,6 +433,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
body=self.body,
project_id=self.project_id,
)
+ CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return response
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 514b4432db..4ef112ead9 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -904,6 +904,9 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink
additional-extras:
[airflow] 04/04: Update unit tests for Datastore operators
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d9e7b6a940e3a5a84eee1c97ca32ef5fd4d22f42
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Wed Apr 20 13:07:17 2022 +0200
Update unit tests for Datastore operators
---
tests/providers/google/cloud/operators/test_datastore.py | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/tests/providers/google/cloud/operators/test_datastore.py b/tests/providers/google/cloud/operators/test_datastore.py
index ddc59a5135..75f3d2ac94 100644
--- a/tests/providers/google/cloud/operators/test_datastore.py
+++ b/tests/providers/google/cloud/operators/test_datastore.py
@@ -35,6 +35,7 @@ CONN_ID = "test-gcp-conn-id"
BODY = {"key", "value"}
TRANSACTION = "transaction-name"
BUCKET = "gs://test-bucket"
+OUTPUT_URL = f"{BUCKET}/entities_export_name/entities_export_name.overall_export_metadata"
FILE = "filename"
OPERATION_ID = "1234"
@@ -44,7 +45,10 @@ class TestCloudDatastoreExportEntitiesOperator:
def test_execute(self, mock_hook):
mock_hook.return_value.export_to_storage_bucket.return_value = {"name": OPERATION_ID}
mock_hook.return_value.poll_operation_until_done.return_value = {
- "metadata": {"common": {"state": "SUCCESSFUL"}}
+ "metadata": {"common": {"state": "SUCCESSFUL"}},
+ "response": {
+ "outputUrl": OUTPUT_URL,
+ },
}
op = CloudDatastoreExportEntitiesOperator(
@@ -53,7 +57,7 @@ class TestCloudDatastoreExportEntitiesOperator:
project_id=PROJECT_ID,
bucket=BUCKET,
)
- op.execute({})
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(CONN_ID, None, impersonation_chain=None)
mock_hook.return_value.export_to_storage_bucket.assert_called_once_with(
@@ -82,7 +86,7 @@ class TestCloudDatastoreImportEntitiesOperator:
bucket=BUCKET,
file=FILE,
)
- op.execute({})
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(CONN_ID, None, impersonation_chain=None)
mock_hook.return_value.import_from_storage_bucket.assert_called_once_with(
@@ -107,7 +111,7 @@ class TestCloudDatastoreAllocateIds:
project_id=PROJECT_ID,
partial_keys=partial_keys,
)
- op.execute({})
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID, impersonation_chain=None)
mock_hook.return_value.allocate_ids.assert_called_once_with(
@@ -138,7 +142,7 @@ class TestCloudDatastoreCommit:
op = CloudDatastoreCommitOperator(
task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
)
- op.execute({})
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID, impersonation_chain=None)
mock_hook.return_value.commit.assert_called_once_with(project_id=PROJECT_ID, body=BODY)