You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:36:34 UTC
[airflow] 01/04: Add links for Cloud Datastore operators
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 43ded6c877e90f53e4f5849499a6ed9585b309b7
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 13:36:38 2022 +0200
Add links for Cloud Datastore operators
---
airflow/providers/google/cloud/links/datastore.py | 97 ++++++++++++++++++++++
.../providers/google/cloud/operators/datastore.py | 17 ++++
airflow/providers/google/provider.yaml | 3 +
3 files changed, 117 insertions(+)
diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
new file mode 100644
index 0000000000..c8066f0246
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+DATASTORE_BASE_LINK = BASE_LINK + "/datastore"
+DATASTORE_IMPORT_EXPORT_LINK = DATASTORE_BASE_LINK + "/import-export?project={project_id}"
+DATASTORE_EXPORT_ENTITIES_LINK = (
+ BASE_LINK + "/storage/browser/{bucket_name}/{export_name}?project={project_id}"
+)
+DATASTORE_ENTITIES_LINK = DATASTORE_BASE_LINK + "/entities/query/kind?project={project_id}"
+
+
+class CloudDatastoreImportExportLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Import/Export Link"""
+
+ name = "Import/Export Page"
+ key = "import_export_conf"
+ format_str = DATASTORE_IMPORT_EXPORT_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreImportExportLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Export Entities Link"""
+
+ name = "Export Entities"
+ key = "export_conf"
+ format_str = DATASTORE_EXPORT_ENTITIES_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ output_url: str,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreExportEntitiesLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ "bucket_name": task_instance.bucket,
+ "export_name": output_url.split('/')[3],
+ },
+ )
+
+
+class CloudDatastoreEntitiesLink(BaseGoogleLink):
+ """Helper class for constructing Cloud Datastore Entities Link"""
+
+ name = "Entities"
+ key = "entities_conf"
+ format_str = DATASTORE_ENTITIES_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=CloudDatastoreEntitiesLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ },
+ )
\ No newline at end of file
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 8282057518..4f24ba4420 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -23,6 +23,11 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.cloud.links.datastore import (
+ CloudDatastoreExportEntitiesLink,
+ CloudDatastoreImportExportLink,
+ CloudDatastoreEntitiesLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -73,6 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
+ operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
def __init__(
self,
@@ -132,6 +138,11 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
state = result['metadata']['common']['state']
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
+ CloudDatastoreExportEntitiesLink.persist(
+ context=context,
+ task_instance=self,
+ output_url=result['response']['outputUrl'],
+ )
return result
@@ -179,6 +190,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
'labels',
'impersonation_chain',
)
+ operator_extra_links = (CloudDatastoreImportExportLink(),)
def __init__(
self,
@@ -231,6 +243,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
if state != 'SUCCESSFUL':
raise AirflowException(f'Operation failed: result={result}')
+ CloudDatastoreImportExportLink.persist(context=context, task_instance=self)
return result
@@ -265,6 +278,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
"partial_keys",
"impersonation_chain",
)
+ operator_extra_links = (CloudDatastoreEntitiesLink(),)
def __init__(
self,
@@ -293,6 +307,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
partial_keys=self.partial_keys,
project_id=self.project_id,
)
+ CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return keys
@@ -389,6 +404,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
"body",
"impersonation_chain",
)
+ operator_extra_links = (CloudDatastoreEntitiesLink(),)
def __init__(
self,
@@ -417,6 +433,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
body=self.body,
project_id=self.project_id,
)
+ CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
return response
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 514b4432db..4ef112ead9 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -904,6 +904,9 @@ extra-links:
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
+ - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
- airflow.providers.google.common.links.storage.StorageLink
additional-extras: