You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by on...@apache.org on 2023/11/22 22:13:48 UTC

(airflow) branch main updated: Make EksPodOperator exec config not rely on log level (#35771)

This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ac977c4e57 Make EksPodOperator exec config not  rely on log level (#35771)
ac977c4e57 is described below

commit ac977c4e5740041911c72c145d50545b64ff6f78
Author: Syed Hussain <10...@users.noreply.github.com>
AuthorDate: Wed Nov 22 14:13:41 2023 -0800

    Make EksPodOperator exec config not  rely on log level (#35771)
    
    
    * Wrap bash inline script around eks_get_token.py to suppress log output
---
 airflow/providers/amazon/aws/hooks/eks.py          | 55 ++++++++++++++--------
 .../providers/amazon/aws/utils/eks_get_token.py    |  9 +---
 tests/providers/amazon/aws/hooks/test_eks.py       | 42 ++++++++++-------
 .../amazon/aws/utils/test_eks_get_token.py         | 19 +++-----
 4 files changed, 66 insertions(+), 59 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/eks.py b/airflow/providers/amazon/aws/hooks/eks.py
index f6e3ed83f8..24a22c6a2b 100644
--- a/airflow/providers/amazon/aws/hooks/eks.py
+++ b/airflow/providers/amazon/aws/hooks/eks.py
@@ -75,6 +75,25 @@ class NodegroupStates(Enum):
     NONEXISTENT = "NONEXISTENT"
 
 
+COMMAND = """
+            output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
+                --cluster-name {eks_cluster_name} {args} 2>&1)
+
+            if [ $? -ne 0 ]; then
+                echo "Error running the script"
+                exit 1
+            fi
+
+            expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp:\s*\K[^,]+')
+            token=$(echo "$output" | grep -oP 'token:\s*\K[^,]+')
+
+            json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
+                "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \
+                {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token")
+            echo $json_string
+            """
+
+
 class EksHook(AwsBaseHook):
     """
     Interact with Amazon Elastic Kubernetes Service (EKS).
@@ -521,6 +540,16 @@ class EksHook(AwsBaseHook):
         :param eks_cluster_name: The name of the cluster to generate kubeconfig file for.
         :param pod_namespace: The namespace to run within kubernetes.
         """
+        args = ""
+        if self.region_name is not None:
+            args = args + f" --region-name {self.region_name}"
+
+        if self.aws_conn_id is not None:
+            args = args + f" --aws-conn-id {self.aws_conn_id}"
+
+        # We need to determine which python executable the host is running in order to correctly
+        # call the eks_get_token.py script.
+        python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}"
         # Set up the client
         eks_client = self.conn
 
@@ -556,28 +585,14 @@ class EksHook(AwsBaseHook):
                     "user": {
                         "exec": {
                             "apiVersion": AUTHENTICATION_API_VERSION,
-                            "command": sys.executable,
+                            "command": "sh",
                             "args": [
-                                "-m",
-                                "airflow.providers.amazon.aws.utils.eks_get_token",
-                                *(
-                                    ["--region-name", self.region_name]
-                                    if self.region_name is not None
-                                    else []
+                                "-c",
+                                COMMAND.format(
+                                    python_executable=python_executable,
+                                    eks_cluster_name=eks_cluster_name,
+                                    args=args,
                                 ),
-                                *(
-                                    ["--aws-conn-id", self.aws_conn_id]
-                                    if self.aws_conn_id is not None
-                                    else []
-                                ),
-                                "--cluster-name",
-                                eks_cluster_name,
-                            ],
-                            "env": [
-                                {
-                                    "name": "AIRFLOW__LOGGING__LOGGING_LEVEL",
-                                    "value": "FATAL",
-                                }
                             ],
                             "interactiveMode": "Never",
                         }
diff --git a/airflow/providers/amazon/aws/utils/eks_get_token.py b/airflow/providers/amazon/aws/utils/eks_get_token.py
index 27bf51676f..9a340671ef 100644
--- a/airflow/providers/amazon/aws/utils/eks_get_token.py
+++ b/airflow/providers/amazon/aws/utils/eks_get_token.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import argparse
-import json
 from datetime import datetime, timedelta, timezone
 
 from airflow.providers.amazon.aws.hooks.eks import EksHook
@@ -58,13 +57,7 @@ def main():
     eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name)
     access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
     access_token_expiration = get_expiration_time()
-    exec_credential_object = {
-        "kind": "ExecCredential",
-        "apiVersion": "client.authentication.k8s.io/v1alpha1",
-        "spec": {},
-        "status": {"expirationTimestamp": access_token_expiration, "token": access_token},
-    }
-    print(json.dumps(exec_credential_object))
+    print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}")
 
 
 if __name__ == "__main__":
diff --git a/tests/providers/amazon/aws/hooks/test_eks.py b/tests/providers/amazon/aws/hooks/test_eks.py
index 35cb246091..e35c0eba1f 100644
--- a/tests/providers/amazon/aws/hooks/test_eks.py
+++ b/tests/providers/amazon/aws/hooks/test_eks.py
@@ -52,7 +52,7 @@ from moto.eks.models import (
     NODEGROUP_NOT_FOUND_MSG,
 )
 
-from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.hooks.eks import COMMAND, EksHook
 
 from ..utils.eks_test_constants import (
     DEFAULT_CONN_ID,
@@ -1198,6 +1198,8 @@ class TestEksHooks:
 
 
 class TestEksHook:
+    python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}"
+
     @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
     @pytest.mark.parametrize(
         "aws_conn_id, region_name, expected_args",
@@ -1206,32 +1208,37 @@ class TestEksHook:
                 "test-id",
                 "test-region",
                 [
-                    "-m",
-                    "airflow.providers.amazon.aws.utils.eks_get_token",
-                    "--region-name",
-                    "test-region",
-                    "--aws-conn-id",
-                    "test-id",
-                    "--cluster-name",
-                    "test-cluster",
+                    "-c",
+                    COMMAND.format(
+                        python_executable=python_executable,
+                        eks_cluster_name="test-cluster",
+                        args=" --region-name test-region --aws-conn-id test-id",
+                    ),
                 ],
             ],
             [
                 None,
                 "test-region",
                 [
-                    "-m",
-                    "airflow.providers.amazon.aws.utils.eks_get_token",
-                    "--region-name",
-                    "test-region",
-                    "--cluster-name",
-                    "test-cluster",
+                    "-c",
+                    COMMAND.format(
+                        python_executable=python_executable,
+                        eks_cluster_name="test-cluster",
+                        args=" --region-name test-region",
+                    ),
                 ],
             ],
             [
                 None,
                 None,
-                ["-m", "airflow.providers.amazon.aws.utils.eks_get_token", "--cluster-name", "test-cluster"],
+                [
+                    "-c",
+                    COMMAND.format(
+                        python_executable=python_executable,
+                        eks_cluster_name="test-cluster",
+                        args="",
+                    ),
+                ],
             ],
         ],
     )
@@ -1271,8 +1278,7 @@ class TestEksHook:
                             "exec": {
                                 "apiVersion": "client.authentication.k8s.io/v1alpha1",
                                 "args": expected_args,
-                                "command": sys.executable,
-                                "env": [{"name": "AIRFLOW__LOGGING__LOGGING_LEVEL", "value": "FATAL"}],
+                                "command": "sh",
                                 "interactiveMode": "Never",
                             }
                         },
diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py
index c8d66de0db..8825c6c218 100644
--- a/tests/providers/amazon/aws/utils/test_eks_get_token.py
+++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py
@@ -17,12 +17,10 @@
 from __future__ import annotations
 
 import contextlib
-import json
 import os
 import runpy
 from io import StringIO
 from unittest import mock
-from unittest.mock import ANY
 
 import pytest
 import time_machine
@@ -76,17 +74,12 @@ class TestGetEksToken:
             os.chdir(AIRFLOW_MAIN_FOLDER)
             # We are not using run_module because of https://github.com/pytest-dev/pytest/issues/9007
             runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py", run_name="__main__")
-        json_output = json.loads(temp_stdout.getvalue())
-        assert {
-            "apiVersion": "client.authentication.k8s.io/v1alpha1",
-            "kind": "ExecCredential",
-            "spec": {},
-            "status": {
-                "expirationTimestamp": ANY,  # depending on local timezone, this can be different
-                "token": "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t",
-            },
-        } == json_output
-        assert json_output["status"]["expirationTimestamp"].startswith("1995-02-")
+        output = temp_stdout.getvalue()
+        token = "token: k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t"
+        expected_token = output.split(",")[1].strip()
+        expected_expiration_timestamp = output.split(",")[0].split(":")[1].strip()
+        assert expected_token == token
+        assert expected_expiration_timestamp.startswith("1995-02-")
         mock_eks_hook.assert_called_once_with(
             aws_conn_id=expected_aws_conn_id, region_name=expected_region_name
         )