You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:36:36 UTC
[airflow] 03/04: Change CloudDatastoreExportEntitiesLink to StorageLink
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 544d658921cb2c9ac1abc5a89f2e275255cb924b
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 15:49:04 2022 +0200
Change CloudDatastoreExportEntitiesLink to StorageLink
---
airflow/providers/google/cloud/links/datastore.py | 24 ----------------------
.../providers/google/cloud/operators/datastore.py | 8 ++++----
airflow/providers/google/provider.yaml | 1 -
3 files changed, 4 insertions(+), 29 deletions(-)
diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
index 0416896edd..d17a6a8ef0 100644
--- a/airflow/providers/google/cloud/links/datastore.py
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -52,30 +52,6 @@ class CloudDatastoreImportExportLink(BaseGoogleLink):
)
-class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
- """Helper class for constructing Cloud Datastore Export Entities Link"""
-
- name = "Export Entities"
- key = "export_conf"
- format_str = DATASTORE_EXPORT_ENTITIES_LINK
-
- @staticmethod
- def persist(
- context: "Context",
- task_instance,
- output_url: str,
- ):
- task_instance.xcom_push(
- context=context,
- key=CloudDatastoreExportEntitiesLink.key,
- value={
- "project_id": task_instance.project_id,
- "bucket_name": task_instance.bucket,
- "export_name": output_url.split('/')[3],
- },
- )
-
-
class CloudDatastoreEntitiesLink(BaseGoogleLink):
"""Helper class for constructing Cloud Datastore Entities Link"""
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index a654e3126c..4d168cb2e1 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -25,9 +25,9 @@ from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.datastore import (
CloudDatastoreEntitiesLink,
- CloudDatastoreExportEntitiesLink,
CloudDatastoreImportExportLink,
)
+from airflow.providers.google.common.links.storage import StorageLink
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -78,7 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
- operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
+ operator_extra_links = (StorageLink(),)
def __init__(
self,
@@ -138,10 +138,10 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
- CloudDatastoreExportEntitiesLink.persist(
+ StorageLink.persist(
context=context,
task_instance=self,
- output_url=result['response']['outputUrl'],
+ uri=f"{self.bucket}/{result['response']['outputUrl'].split('/')[3]}",
)
return result
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 4ef112ead9..b1ef488269 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -905,7 +905,6 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
- - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink