You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/06 22:38:21 UTC
[airflow] branch main updated: Add links for Google Kubernetes Engine operators (#24786)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb7162418e Add links for Google Kubernetes Engine operators (#24786)
fb7162418e is described below
commit fb7162418eeacac626467871a7ed5567f20840ac
Author: Maksim <ma...@google.com>
AuthorDate: Thu Jul 7 01:38:12 2022 +0300
Add links for Google Kubernetes Engine operators (#24786)
* Add links for Google Kubernetes Engine operators
* Update unit tests for GKE operators
---
.../google/cloud/links/kubernetes_engine.py | 83 ++++++++++++++++++++++
.../google/cloud/operators/kubernetes_engine.py | 12 +++-
airflow/providers/google/provider.yaml | 2 +
.../cloud/operators/test_kubernetes_engine.py | 16 +++--
4 files changed, 106 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/google/cloud/links/kubernetes_engine.py b/airflow/providers/google/cloud/links/kubernetes_engine.py
new file mode 100644
index 0000000000..10668fc830
--- /dev/null
+++ b/airflow/providers/google/cloud/links/kubernetes_engine.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from typing import TYPE_CHECKING, Dict, Union
+
+from google.cloud.container_v1.types import Cluster
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+KUBERNETES_BASE_LINK = "https://console.cloud.google.com/kubernetes"
+KUBERNETES_CLUSTER_LINK = (
+ KUBERNETES_BASE_LINK + "/clusters/details/{location}/{cluster_name}/details?project={project_id}"
+)
+KUBERNETES_POD_LINK = (
+ KUBERNETES_BASE_LINK
+ + "/pod/{location}/{cluster_name}/{namespace}/{pod_name}/details?project={project_id}"
+)
+
+
+class KubernetesEngineClusterLink(BaseGoogleLink):
+ """Helper class for constructing Kubernetes Engine Cluster Link"""
+
+ name = "Kubernetes Cluster"
+ key = "kubernetes_cluster_conf"
+ format_str = KUBERNETES_CLUSTER_LINK
+
+ @staticmethod
+ def persist(context: "Context", task_instance, cluster: Union[Dict, Cluster, None]):
+ if isinstance(cluster, dict):
+ cluster = Cluster.from_json(json.dumps(cluster))
+
+ task_instance.xcom_push(
+ context=context,
+ key=KubernetesEngineClusterLink.key,
+ value={
+ "location": task_instance.location,
+ "cluster_name": cluster.name, # type: ignore
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class KubernetesEnginePodLink(BaseGoogleLink):
+ """Helper class for constructing Kubernetes Engine Pod Link"""
+
+ name = "Kubernetes Pod"
+ key = "kubernetes_pod_conf"
+ format_str = KUBERNETES_POD_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=KubernetesEnginePodLink.key,
+ value={
+ "location": task_instance.location,
+ "cluster_name": task_instance.cluster_name,
+ "namespace": task_instance.pod.metadata.namespace,
+ "pod_name": task_instance.pod.metadata.name,
+ "project_id": task_instance.project_id,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 83c013ba44..697c88f4e6 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -30,6 +30,10 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+from airflow.providers.google.cloud.links.kubernetes_engine import (
+ KubernetesEngineClusterLink,
+ KubernetesEnginePodLink,
+)
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.utils.process_utils import execute_in_subprocess, patch_environ
@@ -180,6 +184,7 @@ class GKECreateClusterOperator(BaseOperator):
'body',
'impersonation_chain',
)
+ operator_extra_links = (KubernetesEngineClusterLink(),)
def __init__(
self,
@@ -240,6 +245,7 @@ class GKECreateClusterOperator(BaseOperator):
impersonation_chain=self.impersonation_chain,
)
create_op = hook.create_cluster(cluster=self.body, project_id=self.project_id)
+ KubernetesEngineClusterLink.persist(context=context, task_instance=self, cluster=self.body)
return create_op
@@ -292,6 +298,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
template_fields: Sequence[str] = tuple(
{'project_id', 'location', 'cluster_name'} | set(KubernetesPodOperator.template_fields)
)
+ operator_extra_links = (KubernetesEnginePodLink(),)
def __init__(
self,
@@ -419,4 +426,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
use_internal_ip=self.use_internal_ip,
) as config_file:
self.config_file = config_file
- return super().execute(context)
+ result = super().execute(context)
+ if not self.is_delete_operator_pod:
+ KubernetesEnginePodLink.persist(context=context, task_instance=self)
+ return result
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 5d6c0b739e..ddd95bbfa4 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -975,6 +975,8 @@ extra-links:
- airflow.providers.google.cloud.links.spanner.SpannerInstanceLink
- airflow.providers.google.cloud.links.stackdriver.StackdriverNotificationsLink
- airflow.providers.google.cloud.links.stackdriver.StackdriverPoliciesLink
+ - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineClusterLink
+ - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 58bd380ca1..d484044703 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -77,7 +77,7 @@ class TestGoogleCloudPlatformContainerOperator(unittest.TestCase):
project_id=TEST_GCP_PROJECT_ID, location=PROJECT_LOCATION, body=body, task_id=PROJECT_TASK_ID
)
- operator.execute(None)
+ operator.execute(context=mock.MagicMock())
mock_hook.return_value.create_cluster.assert_called_once_with(
cluster=body, project_id=TEST_GCP_PROJECT_ID
)
@@ -191,6 +191,10 @@ class TestGKEPodOperator(unittest.TestCase):
namespace=NAMESPACE,
image=IMAGE,
)
+ self.gke_op.pod = mock.MagicMock(
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ )
def test_template_fields(self):
assert set(KubernetesPodOperator.template_fields).issubset(GKEStartPodOperator.template_fields)
@@ -215,7 +219,7 @@ class TestGKEPodOperator(unittest.TestCase):
side_effect=[FILE_NAME, '/path/to/new-file']
)
- self.gke_op.execute(None)
+ self.gke_op.execute(context=mock.MagicMock())
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
@@ -258,7 +262,7 @@ class TestGKEPodOperator(unittest.TestCase):
side_effect=[FILE_NAME, '/path/to/new-file']
)
- self.gke_op.execute(None)
+ self.gke_op.execute(context=mock.MagicMock())
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
@@ -314,7 +318,7 @@ class TestGKEPodOperator(unittest.TestCase):
side_effect=[FILE_NAME, '/path/to/new-file']
)
- self.gke_op.execute(None)
+ self.gke_op.execute(context=mock.MagicMock())
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
@@ -357,7 +361,7 @@ class TestGKEPodOperator(unittest.TestCase):
side_effect=[FILE_NAME, '/path/to/new-file']
)
self.gke_op.impersonation_chain = "test_account@example.com"
- self.gke_op.execute(None)
+ self.gke_op.execute(context=mock.MagicMock())
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
@@ -401,7 +405,7 @@ class TestGKEPodOperator(unittest.TestCase):
side_effect=[FILE_NAME, '/path/to/new-file']
)
self.gke_op.impersonation_chain = ["test_account@example.com"]
- self.gke_op.execute(None)
+ self.gke_op.execute(context=mock.MagicMock())
mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()