You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:36:34 UTC

[airflow] 01/04: Add links for Cloud Datastore operators

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 43ded6c877e90f53e4f5849499a6ed9585b309b7
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 13:36:38 2022 +0200

    Add links for Cloud Datastore operators
---
 airflow/providers/google/cloud/links/datastore.py  | 97 ++++++++++++++++++++++
 .../providers/google/cloud/operators/datastore.py  | 17 ++++
 airflow/providers/google/provider.yaml             |  3 +
 3 files changed, 117 insertions(+)

diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
new file mode 100644
index 0000000000..c8066f0246
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+DATASTORE_BASE_LINK = BASE_LINK + "/datastore"
+DATASTORE_IMPORT_EXPORT_LINK = DATASTORE_BASE_LINK + "/import-export?project={project_id}"
+DATASTORE_EXPORT_ENTITIES_LINK = (
+    BASE_LINK + "/storage/browser/{bucket_name}/{export_name}?project={project_id}"
+)
+DATASTORE_ENTITIES_LINK = DATASTORE_BASE_LINK + "/entities/query/kind?project={project_id}"
+
+
+class CloudDatastoreImportExportLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Import/Export Link"""
+
+    name = "Import/Export Page"
+    key = "import_export_conf"
+    format_str = DATASTORE_IMPORT_EXPORT_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreImportExportLink.key,
+            value={
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Export Entities Link"""
+
+    name = "Export Entities"
+    key = "export_conf"
+    format_str = DATASTORE_EXPORT_ENTITIES_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+        output_url: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreExportEntitiesLink.key,
+            value={
+                "project_id": task_instance.project_id,
+                "bucket_name": task_instance.bucket,
+                "export_name": output_url.split('/')[3],
+            },
+        )
+
+
+class CloudDatastoreEntitiesLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Entities Link"""
+
+    name = "Entities"
+    key = "entities_conf"
+    format_str = DATASTORE_ENTITIES_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreEntitiesLink.key,
+            value={
+                "project_id": task_instance.project_id,
+            },
+        )
\ No newline at end of file
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 8282057518..4f24ba4420 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -23,6 +23,11 @@ from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.cloud.links.datastore import (
+    CloudDatastoreExportEntitiesLink,
+    CloudDatastoreImportExportLink,
+    CloudDatastoreEntitiesLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -73,6 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         'labels',
         'impersonation_chain',
     )
+    operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
 
     def __init__(
         self,
@@ -132,6 +138,11 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         state = result['metadata']['common']['state']
         if state != 'SUCCESSFUL':
             raise AirflowException(f'Operation failed: result={result}')
+        CloudDatastoreExportEntitiesLink.persist(
+            context=context,
+            task_instance=self,
+            output_url=result['response']['outputUrl'],
+        )
         return result
 
 
@@ -179,6 +190,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
         'labels',
         'impersonation_chain',
     )
+    operator_extra_links = (CloudDatastoreImportExportLink(),)
 
     def __init__(
         self,
@@ -231,6 +243,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
         if state != 'SUCCESSFUL':
             raise AirflowException(f'Operation failed: result={result}')
 
+        CloudDatastoreImportExportLink.persist(context=context, task_instance=self)
         return result
 
 
@@ -265,6 +278,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
         "partial_keys",
         "impersonation_chain",
     )
+    operator_extra_links = (CloudDatastoreEntitiesLink(),)
 
     def __init__(
         self,
@@ -293,6 +307,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
             partial_keys=self.partial_keys,
             project_id=self.project_id,
         )
+        CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
         return keys
 
 
@@ -389,6 +404,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
         "body",
         "impersonation_chain",
     )
+    operator_extra_links = (CloudDatastoreEntitiesLink(),)
 
     def __init__(
         self,
@@ -417,6 +433,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
             body=self.body,
             project_id=self.project_id,
         )
+        CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
         return response
 
 
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 514b4432db..4ef112ead9 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -904,6 +904,9 @@ extra-links:
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
   - airflow.providers.google.cloud.links.dataflow.DataflowJobLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
   - airflow.providers.google.common.links.storage.StorageLink
 
 additional-extras: