You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/06 22:38:21 UTC

[airflow] branch main updated: Add links for Google Kubernetes Engine operators (#24786)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fb7162418e Add links for Google Kubernetes Engine operators (#24786)
fb7162418e is described below

commit fb7162418eeacac626467871a7ed5567f20840ac
Author: Maksim <ma...@google.com>
AuthorDate: Thu Jul 7 01:38:12 2022 +0300

    Add links for Google Kubernetes Engine operators (#24786)
    
    * Add links for Google Kubernetes Engine operators
    
    * Update unit tests for GKE operators
---
 .../google/cloud/links/kubernetes_engine.py        | 83 ++++++++++++++++++++++
 .../google/cloud/operators/kubernetes_engine.py    | 12 +++-
 airflow/providers/google/provider.yaml             |  2 +
 .../cloud/operators/test_kubernetes_engine.py      | 16 +++--
 4 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/google/cloud/links/kubernetes_engine.py b/airflow/providers/google/cloud/links/kubernetes_engine.py
new file mode 100644
index 0000000000..10668fc830
--- /dev/null
+++ b/airflow/providers/google/cloud/links/kubernetes_engine.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from typing import TYPE_CHECKING, Dict, Union
+
+from google.cloud.container_v1.types import Cluster
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+KUBERNETES_BASE_LINK = "https://console.cloud.google.com/kubernetes"
+KUBERNETES_CLUSTER_LINK = (
+    KUBERNETES_BASE_LINK + "/clusters/details/{location}/{cluster_name}/details?project={project_id}"
+)
+KUBERNETES_POD_LINK = (
+    KUBERNETES_BASE_LINK
+    + "/pod/{location}/{cluster_name}/{namespace}/{pod_name}/details?project={project_id}"
+)
+
+
+class KubernetesEngineClusterLink(BaseGoogleLink):
+    """Helper class for constructing Kubernetes Engine Cluster Link"""
+
+    name = "Kubernetes Cluster"
+    key = "kubernetes_cluster_conf"
+    format_str = KUBERNETES_CLUSTER_LINK
+
+    @staticmethod
+    def persist(context: "Context", task_instance, cluster: Union[Dict, Cluster, None]):
+        if isinstance(cluster, dict):
+            cluster = Cluster.from_json(json.dumps(cluster))
+
+        task_instance.xcom_push(
+            context=context,
+            key=KubernetesEngineClusterLink.key,
+            value={
+                "location": task_instance.location,
+                "cluster_name": cluster.name,  # type: ignore
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class KubernetesEnginePodLink(BaseGoogleLink):
+    """Helper class for constructing Kubernetes Engine Pod Link"""
+
+    name = "Kubernetes Pod"
+    key = "kubernetes_pod_conf"
+    format_str = KUBERNETES_POD_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=KubernetesEnginePodLink.key,
+            value={
+                "location": task_instance.location,
+                "cluster_name": task_instance.cluster_name,
+                "namespace": task_instance.pod.metadata.namespace,
+                "pod_name": task_instance.pod.metadata.name,
+                "project_id": task_instance.project_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 83c013ba44..697c88f4e6 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -30,6 +30,10 @@ from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
 from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook
+from airflow.providers.google.cloud.links.kubernetes_engine import (
+    KubernetesEngineClusterLink,
+    KubernetesEnginePodLink,
+)
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
 from airflow.utils.process_utils import execute_in_subprocess, patch_environ
 
@@ -180,6 +184,7 @@ class GKECreateClusterOperator(BaseOperator):
         'body',
         'impersonation_chain',
     )
+    operator_extra_links = (KubernetesEngineClusterLink(),)
 
     def __init__(
         self,
@@ -240,6 +245,7 @@ class GKECreateClusterOperator(BaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
         create_op = hook.create_cluster(cluster=self.body, project_id=self.project_id)
+        KubernetesEngineClusterLink.persist(context=context, task_instance=self, cluster=self.body)
         return create_op
 
 
@@ -292,6 +298,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
     template_fields: Sequence[str] = tuple(
         {'project_id', 'location', 'cluster_name'} | set(KubernetesPodOperator.template_fields)
     )
+    operator_extra_links = (KubernetesEnginePodLink(),)
 
     def __init__(
         self,
@@ -419,4 +426,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
             use_internal_ip=self.use_internal_ip,
         ) as config_file:
             self.config_file = config_file
-            return super().execute(context)
+            result = super().execute(context)
+            if not self.is_delete_operator_pod:
+                KubernetesEnginePodLink.persist(context=context, task_instance=self)
+            return result
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 5d6c0b739e..ddd95bbfa4 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -975,6 +975,8 @@ extra-links:
   - airflow.providers.google.cloud.links.spanner.SpannerInstanceLink
   - airflow.providers.google.cloud.links.stackdriver.StackdriverNotificationsLink
   - airflow.providers.google.cloud.links.stackdriver.StackdriverPoliciesLink
+  - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineClusterLink
+  - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
   - airflow.providers.google.common.links.storage.StorageLink
   - airflow.providers.google.common.links.storage.FileDetailsLink
 
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 58bd380ca1..d484044703 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -77,7 +77,7 @@ class TestGoogleCloudPlatformContainerOperator(unittest.TestCase):
             project_id=TEST_GCP_PROJECT_ID, location=PROJECT_LOCATION, body=body, task_id=PROJECT_TASK_ID
         )
 
-        operator.execute(None)
+        operator.execute(context=mock.MagicMock())
         mock_hook.return_value.create_cluster.assert_called_once_with(
             cluster=body, project_id=TEST_GCP_PROJECT_ID
         )
@@ -191,6 +191,10 @@ class TestGKEPodOperator(unittest.TestCase):
             namespace=NAMESPACE,
             image=IMAGE,
         )
+        self.gke_op.pod = mock.MagicMock(
+            name=TASK_NAME,
+            namespace=NAMESPACE,
+        )
 
     def test_template_fields(self):
         assert set(KubernetesPodOperator.template_fields).issubset(GKEStartPodOperator.template_fields)
@@ -215,7 +219,7 @@ class TestGKEPodOperator(unittest.TestCase):
             side_effect=[FILE_NAME, '/path/to/new-file']
         )
 
-        self.gke_op.execute(None)
+        self.gke_op.execute(context=mock.MagicMock())
 
         mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
 
@@ -258,7 +262,7 @@ class TestGKEPodOperator(unittest.TestCase):
             side_effect=[FILE_NAME, '/path/to/new-file']
         )
 
-        self.gke_op.execute(None)
+        self.gke_op.execute(context=mock.MagicMock())
 
         mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
 
@@ -314,7 +318,7 @@ class TestGKEPodOperator(unittest.TestCase):
             side_effect=[FILE_NAME, '/path/to/new-file']
         )
 
-        self.gke_op.execute(None)
+        self.gke_op.execute(context=mock.MagicMock())
 
         mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
 
@@ -357,7 +361,7 @@ class TestGKEPodOperator(unittest.TestCase):
             side_effect=[FILE_NAME, '/path/to/new-file']
         )
         self.gke_op.impersonation_chain = "test_account@example.com"
-        self.gke_op.execute(None)
+        self.gke_op.execute(context=mock.MagicMock())
 
         mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()
 
@@ -401,7 +405,7 @@ class TestGKEPodOperator(unittest.TestCase):
             side_effect=[FILE_NAME, '/path/to/new-file']
         )
         self.gke_op.impersonation_chain = ["test_account@example.com"]
-        self.gke_op.execute(None)
+        self.gke_op.execute(context=mock.MagicMock())
 
         mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()