You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/26 14:18:18 UTC
[airflow] branch main updated: Create links for Biqtable operators (#23164)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2d569fdaf0 Create links for Biqtable operators (#23164)
2d569fdaf0 is described below
commit 2d569fdaf0727bfe7f3a92fa4ce9ae47236d3615
Author: Maksim <ma...@gmail.com>
AuthorDate: Tue Apr 26 16:18:09 2022 +0200
Create links for Biqtable operators (#23164)
---
airflow/providers/google/cloud/links/bigtable.py | 98 ++++++++++++++++++++++
.../providers/google/cloud/operators/bigtable.py | 14 ++++
airflow/providers/google/cloud/sensors/bigtable.py | 3 +
airflow/providers/google/provider.yaml | 3 +
.../google/cloud/operators/test_bigtable.py | 14 ++--
5 files changed, 125 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/google/cloud/links/bigtable.py b/airflow/providers/google/cloud/links/bigtable.py
new file mode 100644
index 0000000000..cc06129021
--- /dev/null
+++ b/airflow/providers/google/cloud/links/bigtable.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+BIGTABLE_BASE_LINK = BASE_LINK + "/bigtable"
+BIGTABLE_INSTANCE_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/overview?project={project_id}"
+BIGTABLE_CLUSTER_LINK = (
+ BIGTABLE_BASE_LINK + "/instances/{instance_id}/clusters/{cluster_id}?project={project_id}"
+)
+BIGTABLE_TABLES_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/tables?project={project_id}"
+
+
+class BigtableInstanceLink(BaseGoogleLink):
+ """Helper class for constructing Bigtable Instance link"""
+
+ name = "Bigtable Instance"
+ key = "instance_key"
+ format_str = BIGTABLE_INSTANCE_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=BigtableInstanceLink.key,
+ value={
+ "instance_id": task_instance.instance_id,
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class BigtableClusterLink(BaseGoogleLink):
+ """Helper class for constructing Bigtable Cluster link"""
+
+ name = "Bigtable Cluster"
+ key = "cluster_key"
+ format_str = BIGTABLE_CLUSTER_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=BigtableClusterLink.key,
+ value={
+ "instance_id": task_instance.instance_id,
+ "cluster_id": task_instance.cluster_id,
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class BigtableTablesLink(BaseGoogleLink):
+ """Helper class for constructing Bigtable Tables link"""
+
+ name = "Bigtable Tables"
+ key = "tables_key"
+ format_str = BIGTABLE_TABLES_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=BigtableTablesLink.key,
+ value={
+ "instance_id": task_instance.instance_id,
+ "project_id": task_instance.project_id,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index c763db375f..fdecd22089 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -26,6 +26,11 @@ from google.cloud.bigtable_admin_v2 import enums
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
+from airflow.providers.google.cloud.links.bigtable import (
+ BigtableClusterLink,
+ BigtableInstanceLink,
+ BigtableTablesLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -93,6 +98,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
'main_cluster_zone',
'impersonation_chain',
)
+ operator_extra_links = (BigtableInstanceLink(),)
def __init__(
self,
@@ -141,6 +147,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
"The instance '%s' already exists in this project. Consider it as created",
self.instance_id,
)
+ BigtableInstanceLink.persist(context=context, task_instance=self)
return
try:
hook.create_instance(
@@ -156,6 +163,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
cluster_storage_type=self.cluster_storage_type,
timeout=self.timeout,
)
+ BigtableInstanceLink.persist(context=context, task_instance=self)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
@@ -198,6 +206,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
'instance_id',
'impersonation_chain',
)
+ operator_extra_links = (BigtableInstanceLink(),)
def __init__(
self,
@@ -241,6 +250,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
instance_labels=self.instance_labels,
timeout=self.timeout,
)
+ BigtableInstanceLink.persist(context=context, task_instance=self)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
@@ -351,6 +361,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
'table_id',
'impersonation_chain',
)
+ operator_extra_links = (BigtableTablesLink(),)
def __init__(
self,
@@ -412,6 +423,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
initial_split_keys=self.initial_split_keys,
column_families=self.column_families,
)
+ BigtableTablesLink.persist(context=context, task_instance=self)
except google.api_core.exceptions.AlreadyExists:
if not self._compare_column_families(hook, instance):
raise AirflowException(
@@ -533,6 +545,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
'nodes',
'impersonation_chain',
)
+ operator_extra_links = (BigtableClusterLink(),)
def __init__(
self,
@@ -565,6 +578,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
try:
hook.update_cluster(instance=instance, cluster_id=self.cluster_id, nodes=self.nodes)
+ BigtableClusterLink.persist(context=context, task_instance=self)
except google.api_core.exceptions.NotFound:
raise AirflowException(
f"Dependency: cluster '{self.cluster_id}' does not exist for instance '{self.instance_id}'."
diff --git a/airflow/providers/google/cloud/sensors/bigtable.py b/airflow/providers/google/cloud/sensors/bigtable.py
index 9401b1423d..63a1ae35e6 100644
--- a/airflow/providers/google/cloud/sensors/bigtable.py
+++ b/airflow/providers/google/cloud/sensors/bigtable.py
@@ -23,6 +23,7 @@ from google.cloud.bigtable.table import ClusterState
from google.cloud.bigtable_admin_v2 import enums
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
+from airflow.providers.google.cloud.links.bigtable import BigtableTablesLink
from airflow.providers.google.cloud.operators.bigtable import BigtableValidationMixin
from airflow.sensors.base import BaseSensorOperator
@@ -62,6 +63,7 @@ class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValida
'table_id',
'impersonation_chain',
)
+ operator_extra_links = (BigtableTablesLink(),)
def __init__(
self,
@@ -111,4 +113,5 @@ class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValida
return False
self.log.info("Table '%s' is replicated.", self.table_id)
+ BigtableTablesLink.persist(context=context, task_instance=self)
return True
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index b1ef488269..35524343d2 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -906,6 +906,9 @@ extra-links:
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
+ - airflow.providers.google.cloud.links.bigtable.BigtableInstanceLink
+ - airflow.providers.google.cloud.links.bigtable.BigtableClusterLink
+ - airflow.providers.google.cloud.links.bigtable.BigtableTablesLink
- airflow.providers.google.common.links.storage.StorageLink
additional-extras:
diff --git a/tests/providers/google/cloud/operators/test_bigtable.py b/tests/providers/google/cloud/operators/test_bigtable.py
index 27293d901e..3e9c329a47 100644
--- a/tests/providers/google/cloud/operators/test_bigtable.py
+++ b/tests/providers/google/cloud/operators/test_bigtable.py
@@ -100,7 +100,7 @@ class TestBigtableInstanceCreate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -120,7 +120,7 @@ class TestBigtableInstanceCreate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -178,7 +178,7 @@ class TestBigtableInstanceCreate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
@@ -210,7 +210,7 @@ class TestBigtableInstanceCreate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
@@ -243,7 +243,7 @@ class TestBigtableInstanceUpdate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
@@ -268,7 +268,7 @@ class TestBigtableInstanceUpdate:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
@@ -803,7 +803,7 @@ class TestBigtableTableCreate:
impersonation_chain=IMPERSONATION_CHAIN,
)
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
- op.execute(None)
+ op.execute(context={'ti': mock.MagicMock()})
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,