You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by on...@apache.org on 2023/11/22 22:13:48 UTC
(airflow) branch main updated: Make EksPodOperator exec config not rely on log level (#35771)
This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ac977c4e57 Make EksPodOperator exec config not rely on log level (#35771)
ac977c4e57 is described below
commit ac977c4e5740041911c72c145d50545b64ff6f78
Author: Syed Hussain <10...@users.noreply.github.com>
AuthorDate: Wed Nov 22 14:13:41 2023 -0800
Make EksPodOperator exec config not rely on log level (#35771)
* Wrap bash inline script around eks_get_token.py to suppress log output
---
airflow/providers/amazon/aws/hooks/eks.py | 55 ++++++++++++++--------
.../providers/amazon/aws/utils/eks_get_token.py | 9 +---
tests/providers/amazon/aws/hooks/test_eks.py | 42 ++++++++++-------
.../amazon/aws/utils/test_eks_get_token.py | 19 +++-----
4 files changed, 66 insertions(+), 59 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/eks.py b/airflow/providers/amazon/aws/hooks/eks.py
index f6e3ed83f8..24a22c6a2b 100644
--- a/airflow/providers/amazon/aws/hooks/eks.py
+++ b/airflow/providers/amazon/aws/hooks/eks.py
@@ -75,6 +75,25 @@ class NodegroupStates(Enum):
NONEXISTENT = "NONEXISTENT"
+COMMAND = """
+ output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
+ --cluster-name {eks_cluster_name} {args} 2>&1)
+
+ if [ $? -ne 0 ]; then
+ echo "Error running the script"
+ exit 1
+ fi
+
+ expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp:\s*\K[^,]+')
+ token=$(echo "$output" | grep -oP 'token:\s*\K[^,]+')
+
+ json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
+ "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \
+ {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token")
+ echo $json_string
+ """
+
+
class EksHook(AwsBaseHook):
"""
Interact with Amazon Elastic Kubernetes Service (EKS).
@@ -521,6 +540,16 @@ class EksHook(AwsBaseHook):
:param eks_cluster_name: The name of the cluster to generate kubeconfig file for.
:param pod_namespace: The namespace to run within kubernetes.
"""
+ args = ""
+ if self.region_name is not None:
+ args = args + f" --region-name {self.region_name}"
+
+ if self.aws_conn_id is not None:
+ args = args + f" --aws-conn-id {self.aws_conn_id}"
+
+ # We need to determine which python executable the host is running in order to correctly
+ # call the eks_get_token.py script.
+ python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}"
# Set up the client
eks_client = self.conn
@@ -556,28 +585,14 @@ class EksHook(AwsBaseHook):
"user": {
"exec": {
"apiVersion": AUTHENTICATION_API_VERSION,
- "command": sys.executable,
+ "command": "sh",
"args": [
- "-m",
- "airflow.providers.amazon.aws.utils.eks_get_token",
- *(
- ["--region-name", self.region_name]
- if self.region_name is not None
- else []
+ "-c",
+ COMMAND.format(
+ python_executable=python_executable,
+ eks_cluster_name=eks_cluster_name,
+ args=args,
),
- *(
- ["--aws-conn-id", self.aws_conn_id]
- if self.aws_conn_id is not None
- else []
- ),
- "--cluster-name",
- eks_cluster_name,
- ],
- "env": [
- {
- "name": "AIRFLOW__LOGGING__LOGGING_LEVEL",
- "value": "FATAL",
- }
],
"interactiveMode": "Never",
}
diff --git a/airflow/providers/amazon/aws/utils/eks_get_token.py b/airflow/providers/amazon/aws/utils/eks_get_token.py
index 27bf51676f..9a340671ef 100644
--- a/airflow/providers/amazon/aws/utils/eks_get_token.py
+++ b/airflow/providers/amazon/aws/utils/eks_get_token.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import argparse
-import json
from datetime import datetime, timedelta, timezone
from airflow.providers.amazon.aws.hooks.eks import EksHook
@@ -58,13 +57,7 @@ def main():
eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name)
access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
access_token_expiration = get_expiration_time()
- exec_credential_object = {
- "kind": "ExecCredential",
- "apiVersion": "client.authentication.k8s.io/v1alpha1",
- "spec": {},
- "status": {"expirationTimestamp": access_token_expiration, "token": access_token},
- }
- print(json.dumps(exec_credential_object))
+ print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}")
if __name__ == "__main__":
diff --git a/tests/providers/amazon/aws/hooks/test_eks.py b/tests/providers/amazon/aws/hooks/test_eks.py
index 35cb246091..e35c0eba1f 100644
--- a/tests/providers/amazon/aws/hooks/test_eks.py
+++ b/tests/providers/amazon/aws/hooks/test_eks.py
@@ -52,7 +52,7 @@ from moto.eks.models import (
NODEGROUP_NOT_FOUND_MSG,
)
-from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.hooks.eks import COMMAND, EksHook
from ..utils.eks_test_constants import (
DEFAULT_CONN_ID,
@@ -1198,6 +1198,8 @@ class TestEksHooks:
class TestEksHook:
+ python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}"
+
@mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
@pytest.mark.parametrize(
"aws_conn_id, region_name, expected_args",
@@ -1206,32 +1208,37 @@ class TestEksHook:
"test-id",
"test-region",
[
- "-m",
- "airflow.providers.amazon.aws.utils.eks_get_token",
- "--region-name",
- "test-region",
- "--aws-conn-id",
- "test-id",
- "--cluster-name",
- "test-cluster",
+ "-c",
+ COMMAND.format(
+ python_executable=python_executable,
+ eks_cluster_name="test-cluster",
+ args=" --region-name test-region --aws-conn-id test-id",
+ ),
],
],
[
None,
"test-region",
[
- "-m",
- "airflow.providers.amazon.aws.utils.eks_get_token",
- "--region-name",
- "test-region",
- "--cluster-name",
- "test-cluster",
+ "-c",
+ COMMAND.format(
+ python_executable=python_executable,
+ eks_cluster_name="test-cluster",
+ args=" --region-name test-region",
+ ),
],
],
[
None,
None,
- ["-m", "airflow.providers.amazon.aws.utils.eks_get_token", "--cluster-name", "test-cluster"],
+ [
+ "-c",
+ COMMAND.format(
+ python_executable=python_executable,
+ eks_cluster_name="test-cluster",
+ args="",
+ ),
+ ],
],
],
)
@@ -1271,8 +1278,7 @@ class TestEksHook:
"exec": {
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"args": expected_args,
- "command": sys.executable,
- "env": [{"name": "AIRFLOW__LOGGING__LOGGING_LEVEL", "value": "FATAL"}],
+ "command": "sh",
"interactiveMode": "Never",
}
},
diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py
index c8d66de0db..8825c6c218 100644
--- a/tests/providers/amazon/aws/utils/test_eks_get_token.py
+++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py
@@ -17,12 +17,10 @@
from __future__ import annotations
import contextlib
-import json
import os
import runpy
from io import StringIO
from unittest import mock
-from unittest.mock import ANY
import pytest
import time_machine
@@ -76,17 +74,12 @@ class TestGetEksToken:
os.chdir(AIRFLOW_MAIN_FOLDER)
# We are not using run_module because of https://github.com/pytest-dev/pytest/issues/9007
runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py", run_name="__main__")
- json_output = json.loads(temp_stdout.getvalue())
- assert {
- "apiVersion": "client.authentication.k8s.io/v1alpha1",
- "kind": "ExecCredential",
- "spec": {},
- "status": {
- "expirationTimestamp": ANY, # depending on local timezone, this can be different
- "token": "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t",
- },
- } == json_output
- assert json_output["status"]["expirationTimestamp"].startswith("1995-02-")
+ output = temp_stdout.getvalue()
+ token = "token: k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t"
+ expected_token = output.split(",")[1].strip()
+ expected_expiration_timestamp = output.split(",")[0].split(":")[1].strip()
+ assert expected_token == token
+ assert expected_expiration_timestamp.startswith("1995-02-")
mock_eks_hook.assert_called_once_with(
aws_conn_id=expected_aws_conn_id, region_name=expected_region_name
)