You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/26 14:18:18 UTC

[airflow] branch main updated: Create links for Biqtable operators (#23164)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d569fdaf0 Create links for Biqtable operators (#23164)
2d569fdaf0 is described below

commit 2d569fdaf0727bfe7f3a92fa4ce9ae47236d3615
Author: Maksim <ma...@gmail.com>
AuthorDate: Tue Apr 26 16:18:09 2022 +0200

    Create links for Biqtable operators (#23164)
---
 airflow/providers/google/cloud/links/bigtable.py   | 98 ++++++++++++++++++++++
 .../providers/google/cloud/operators/bigtable.py   | 14 ++++
 airflow/providers/google/cloud/sensors/bigtable.py |  3 +
 airflow/providers/google/provider.yaml             |  3 +
 .../google/cloud/operators/test_bigtable.py        | 14 ++--
 5 files changed, 125 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/google/cloud/links/bigtable.py b/airflow/providers/google/cloud/links/bigtable.py
new file mode 100644
index 0000000000..cc06129021
--- /dev/null
+++ b/airflow/providers/google/cloud/links/bigtable.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+BIGTABLE_BASE_LINK = BASE_LINK + "/bigtable"
+BIGTABLE_INSTANCE_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/overview?project={project_id}"
+BIGTABLE_CLUSTER_LINK = (
+    BIGTABLE_BASE_LINK + "/instances/{instance_id}/clusters/{cluster_id}?project={project_id}"
+)
+BIGTABLE_TABLES_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/tables?project={project_id}"
+
+
+class BigtableInstanceLink(BaseGoogleLink):
+    """Helper class for constructing Bigtable Instance link"""
+
+    name = "Bigtable Instance"
+    key = "instance_key"
+    format_str = BIGTABLE_INSTANCE_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=BigtableInstanceLink.key,
+            value={
+                "instance_id": task_instance.instance_id,
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class BigtableClusterLink(BaseGoogleLink):
+    """Helper class for constructing Bigtable Cluster link"""
+
+    name = "Bigtable Cluster"
+    key = "cluster_key"
+    format_str = BIGTABLE_CLUSTER_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=BigtableClusterLink.key,
+            value={
+                "instance_id": task_instance.instance_id,
+                "cluster_id": task_instance.cluster_id,
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class BigtableTablesLink(BaseGoogleLink):
+    """Helper class for constructing Bigtable Tables link"""
+
+    name = "Bigtable Tables"
+    key = "tables_key"
+    format_str = BIGTABLE_TABLES_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=BigtableTablesLink.key,
+            value={
+                "instance_id": task_instance.instance_id,
+                "project_id": task_instance.project_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index c763db375f..fdecd22089 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -26,6 +26,11 @@ from google.cloud.bigtable_admin_v2 import enums
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
+from airflow.providers.google.cloud.links.bigtable import (
+    BigtableClusterLink,
+    BigtableInstanceLink,
+    BigtableTablesLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -93,6 +98,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
         'main_cluster_zone',
         'impersonation_chain',
     )
+    operator_extra_links = (BigtableInstanceLink(),)
 
     def __init__(
         self,
@@ -141,6 +147,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
                 "The instance '%s' already exists in this project. Consider it as created",
                 self.instance_id,
             )
+            BigtableInstanceLink.persist(context=context, task_instance=self)
             return
         try:
             hook.create_instance(
@@ -156,6 +163,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
                 cluster_storage_type=self.cluster_storage_type,
                 timeout=self.timeout,
             )
+            BigtableInstanceLink.persist(context=context, task_instance=self)
         except google.api_core.exceptions.GoogleAPICallError as e:
             self.log.error('An error occurred. Exiting.')
             raise e
@@ -198,6 +206,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
         'instance_id',
         'impersonation_chain',
     )
+    operator_extra_links = (BigtableInstanceLink(),)
 
     def __init__(
         self,
@@ -241,6 +250,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
                 instance_labels=self.instance_labels,
                 timeout=self.timeout,
             )
+            BigtableInstanceLink.persist(context=context, task_instance=self)
         except google.api_core.exceptions.GoogleAPICallError as e:
             self.log.error('An error occurred. Exiting.')
             raise e
@@ -351,6 +361,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
         'table_id',
         'impersonation_chain',
     )
+    operator_extra_links = (BigtableTablesLink(),)
 
     def __init__(
         self,
@@ -412,6 +423,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
                 initial_split_keys=self.initial_split_keys,
                 column_families=self.column_families,
             )
+            BigtableTablesLink.persist(context=context, task_instance=self)
         except google.api_core.exceptions.AlreadyExists:
             if not self._compare_column_families(hook, instance):
                 raise AirflowException(
@@ -533,6 +545,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
         'nodes',
         'impersonation_chain',
     )
+    operator_extra_links = (BigtableClusterLink(),)
 
     def __init__(
         self,
@@ -565,6 +578,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
 
         try:
             hook.update_cluster(instance=instance, cluster_id=self.cluster_id, nodes=self.nodes)
+            BigtableClusterLink.persist(context=context, task_instance=self)
         except google.api_core.exceptions.NotFound:
             raise AirflowException(
                 f"Dependency: cluster '{self.cluster_id}' does not exist for instance '{self.instance_id}'."
diff --git a/airflow/providers/google/cloud/sensors/bigtable.py b/airflow/providers/google/cloud/sensors/bigtable.py
index 9401b1423d..63a1ae35e6 100644
--- a/airflow/providers/google/cloud/sensors/bigtable.py
+++ b/airflow/providers/google/cloud/sensors/bigtable.py
@@ -23,6 +23,7 @@ from google.cloud.bigtable.table import ClusterState
 from google.cloud.bigtable_admin_v2 import enums
 
 from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
+from airflow.providers.google.cloud.links.bigtable import BigtableTablesLink
 from airflow.providers.google.cloud.operators.bigtable import BigtableValidationMixin
 from airflow.sensors.base import BaseSensorOperator
 
@@ -62,6 +63,7 @@ class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValida
         'table_id',
         'impersonation_chain',
     )
+    operator_extra_links = (BigtableTablesLink(),)
 
     def __init__(
         self,
@@ -111,4 +113,5 @@ class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValida
             return False
 
         self.log.info("Table '%s' is replicated.", self.table_id)
+        BigtableTablesLink.persist(context=context, task_instance=self)
         return True
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index b1ef488269..35524343d2 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -906,6 +906,9 @@ extra-links:
   - airflow.providers.google.cloud.links.dataflow.DataflowJobLink
   - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
   - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
+  - airflow.providers.google.cloud.links.bigtable.BigtableInstanceLink
+  - airflow.providers.google.cloud.links.bigtable.BigtableClusterLink
+  - airflow.providers.google.cloud.links.bigtable.BigtableTablesLink
   - airflow.providers.google.common.links.storage.StorageLink
 
 additional-extras:
diff --git a/tests/providers/google/cloud/operators/test_bigtable.py b/tests/providers/google/cloud/operators/test_bigtable.py
index 27293d901e..3e9c329a47 100644
--- a/tests/providers/google/cloud/operators/test_bigtable.py
+++ b/tests/providers/google/cloud/operators/test_bigtable.py
@@ -100,7 +100,7 @@ class TestBigtableInstanceCreate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -120,7 +120,7 @@ class TestBigtableInstanceCreate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -178,7 +178,7 @@ class TestBigtableInstanceCreate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
@@ -210,7 +210,7 @@ class TestBigtableInstanceCreate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
@@ -243,7 +243,7 @@ class TestBigtableInstanceUpdate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
@@ -268,7 +268,7 @@ class TestBigtableInstanceUpdate:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
@@ -803,7 +803,7 @@ class TestBigtableTableCreate:
             impersonation_chain=IMPERSONATION_CHAIN,
         )
         instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
-        op.execute(None)
+        op.execute(context={'ti': mock.MagicMock()})
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,