You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/11/07 13:46:51 UTC
(airflow) branch main updated: Split AWS Links tests into separate modules (#35484)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b1ee7244f6 Split AWS Links tests into separate modules (#35484)
b1ee7244f6 is described below
commit b1ee7244f68ed9a7d855c56f2ed8b5cbaa28c834
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Tue Nov 7 17:46:42 2023 +0400
Split AWS Links tests into separate modules (#35484)
* Split AWS Links tests into separate modules
* Add Glue Link(s) tests
---
tests/always/test_project_structure.py | 5 -
tests/providers/amazon/aws/links/conftest.py | 50 -----
tests/providers/amazon/aws/links/test_base.py | 84 --------
tests/providers/amazon/aws/links/test_base_aws.py | 215 +++++++++++++++++++++
tests/providers/amazon/aws/links/test_batch.py | 65 +++++++
tests/providers/amazon/aws/links/test_emr.py | 77 ++++++++
.../links_test_utils.py => links/test_glue.py} | 20 +-
tests/providers/amazon/aws/links/test_links.py | 167 ----------------
tests/providers/amazon/aws/links/test_logs.py | 38 ++++
9 files changed, 410 insertions(+), 311 deletions(-)
diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py
index b2e5dc1c9d..9976a9950d 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -65,11 +65,6 @@ class TestProjectStructure:
"tests/providers/amazon/aws/executors/ecs/test_utils.py",
"tests/providers/amazon/aws/fs/test_s3.py",
"tests/providers/amazon/aws/hooks/test_dms.py",
- "tests/providers/amazon/aws/links/test_base_aws.py",
- "tests/providers/amazon/aws/links/test_batch.py",
- "tests/providers/amazon/aws/links/test_emr.py",
- "tests/providers/amazon/aws/links/test_glue.py",
- "tests/providers/amazon/aws/links/test_logs.py",
"tests/providers/amazon/aws/operators/test_dms.py",
"tests/providers/amazon/aws/operators/test_emr.py",
"tests/providers/amazon/aws/operators/test_sagemaker.py",
diff --git a/tests/providers/amazon/aws/links/conftest.py b/tests/providers/amazon/aws/links/conftest.py
deleted file mode 100644
index b2b9a4188c..0000000000
--- a/tests/providers/amazon/aws/links/conftest.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-import pytest
-
-from tests.providers.amazon.aws.utils.links_test_utils import link_test_operator
-
-if TYPE_CHECKING:
- from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
-
-
-@pytest.fixture()
-def create_task_and_ti_of_op_with_extra_link(create_task_instance_of_operator):
- def _create_op_and_ti(
- extra_link_class: BaseAwsLink,
- *,
- dag_id,
- task_id,
- execution_date=None,
- session=None,
- **operator_kwargs,
- ):
- op = link_test_operator(extra_link_class)
- return op(task_id=task_id), create_task_instance_of_operator(
- link_test_operator(extra_link_class),
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date,
- session=session,
- **operator_kwargs,
- )
-
- return _create_op_and_ti
diff --git a/tests/providers/amazon/aws/links/test_base.py b/tests/providers/amazon/aws/links/test_base.py
deleted file mode 100644
index 42dabde810..0000000000
--- a/tests/providers/amazon/aws/links/test_base.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest.mock import MagicMock
-
-import pytest
-
-from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
-from tests.test_utils.mock_operators import MockOperator
-
-XCOM_KEY = "test_xcom_key"
-CUSTOM_KEYS = {
- "foo": "bar",
- "spam": "egg",
-}
-
-
-class SimpleBaseAwsLink(BaseAwsLink):
- key = XCOM_KEY
-
-
-class TestBaseAwsLink:
- @pytest.mark.parametrize(
- "region_name, aws_partition,keywords,expected_value",
- [
- ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}),
- ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}),
- (
- "us-gov-east-1",
- "aws-us-gov",
- {},
- {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"},
- ),
- (
- "eu-west-1",
- "aws",
- CUSTOM_KEYS,
- {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS},
- ),
- ],
- )
- def test_persist(self, region_name, aws_partition, keywords, expected_value):
- mock_context = MagicMock()
-
- SimpleBaseAwsLink.persist(
- context=mock_context,
- operator=MockOperator(task_id="test_task_id"),
- region_name=region_name,
- aws_partition=aws_partition,
- **keywords,
- )
-
- ti = mock_context["ti"]
- ti.xcom_push.assert_called_once_with(
- execution_date=None,
- key=XCOM_KEY,
- value=expected_value,
- )
-
- def test_disable_xcom_push(self):
- mock_context = MagicMock()
- SimpleBaseAwsLink.persist(
- context=mock_context,
- operator=MockOperator(task_id="test_task_id", do_xcom_push=False),
- region_name="eu-east-1",
- aws_partition="aws",
- )
- ti = mock_context["ti"]
- ti.xcom_push.assert_not_called()
diff --git a/tests/providers/amazon/aws/links/test_base_aws.py b/tests/providers/amazon/aws/links/test_base_aws.py
new file mode 100644
index 0000000000..a8bf17c3db
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_base_aws.py
@@ -0,0 +1,215 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import abstractmethod
+from typing import TYPE_CHECKING, NamedTuple
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
+from airflow.serialization.serialized_objects import SerializedDAG
+from tests.test_utils.mock_operators import MockOperator
+
+if TYPE_CHECKING:
+ from airflow.models import TaskInstance
+
+XCOM_KEY = "test_xcom_key"
+CUSTOM_KEYS = {
+ "foo": "bar",
+ "spam": "egg",
+}
+TEST_REGION_NAME = "eu-west-1"
+TEST_AWS_PARTITION = "aws"
+
+
+class SimpleBaseAwsLink(BaseAwsLink):
+ key = XCOM_KEY
+
+
+class TestBaseAwsLink:
+ @pytest.mark.parametrize(
+ "region_name, aws_partition,keywords,expected_value",
+ [
+ ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}),
+ ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}),
+ (
+ "us-gov-east-1",
+ "aws-us-gov",
+ {},
+ {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"},
+ ),
+ (
+ "eu-west-1",
+ "aws",
+ CUSTOM_KEYS,
+ {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS},
+ ),
+ ],
+ )
+ def test_persist(self, region_name, aws_partition, keywords, expected_value):
+ mock_context = MagicMock()
+
+ SimpleBaseAwsLink.persist(
+ context=mock_context,
+ operator=MockOperator(task_id="test_task_id"),
+ region_name=region_name,
+ aws_partition=aws_partition,
+ **keywords,
+ )
+
+ ti = mock_context["ti"]
+ ti.xcom_push.assert_called_once_with(
+ execution_date=None,
+ key=XCOM_KEY,
+ value=expected_value,
+ )
+
+ def test_disable_xcom_push(self):
+ mock_context = MagicMock()
+ SimpleBaseAwsLink.persist(
+ context=mock_context,
+ operator=MockOperator(task_id="test_task_id", do_xcom_push=False),
+ region_name="eu-east-1",
+ aws_partition="aws",
+ )
+ ti = mock_context["ti"]
+ ti.xcom_push.assert_not_called()
+
+
+def link_test_operator(*links):
+ """Helper for create mock operator class with extra links"""
+
+ class LinkTestOperator(MockOperator):
+ operator_extra_links = tuple(c() for c in links)
+
+ return LinkTestOperator
+
+
+class OperatorAndTi(NamedTuple):
+ """Helper container for store task and generated task instance."""
+
+ task: MockOperator
+ task_instance: TaskInstance
+
+
+@pytest.mark.db_test
+@pytest.mark.need_serialized_dag
+class BaseAwsLinksTestCase:
+ """Base class for AWS Provider links tests."""
+
+ link_class: type[BaseAwsLink]
+
+ @pytest.fixture(autouse=True)
+ def setup_base_test_case(self, dag_maker, create_task_instance_of_operator):
+ self.dag_maker = dag_maker
+ self.ti_maker = create_task_instance_of_operator
+
+ @property
+ def full_qualname(self) -> str:
+ return f"{self.link_class.__module__}.{self.link_class.__qualname__}"
+
+ @property
+ def task_id(self) -> str:
+ return f"test-{self.link_class.__name__}"
+
+ def create_op_and_ti(
+ self,
+ extra_link_class: type[BaseAwsLink],
+ *,
+ dag_id,
+ task_id,
+ execution_date=None,
+ session=None,
+ **operator_kwargs,
+ ):
+ """Helper method for generate operator and task instance"""
+ op = link_test_operator(extra_link_class)
+ return OperatorAndTi(
+ task=op(task_id=task_id),
+ task_instance=self.ti_maker(
+ op,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date,
+ session=session,
+ **operator_kwargs,
+ ),
+ )
+
+ def assert_extra_link_url(
+ self,
+ expected_url: str,
+ region_name=TEST_REGION_NAME,
+ aws_partition=TEST_AWS_PARTITION,
+ **extra_link_kwargs,
+ ):
+ """Helper method for create extra link URL from the parameters."""
+ task, ti = self.create_op_and_ti(self.link_class, dag_id="test_extra_link", task_id=self.task_id)
+
+ mock_context = MagicMock()
+ mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__
+
+ self.link_class.persist(
+ context=mock_context,
+ operator=task,
+ region_name=region_name,
+ aws_partition=aws_partition,
+ **extra_link_kwargs,
+ )
+
+ error_msg = f"{self.full_qualname!r} should be preserved after execution"
+ assert ti.task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg
+
+ serialized_dag = self.dag_maker.get_serialized_data()
+ deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+ deserialized_task = deserialized_dag.task_dict[self.task_id]
+
+ error_msg = f"{self.full_qualname!r} should be preserved in deserialized tasks after execution"
+ assert deserialized_task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg
+
+ def test_link_serialize(self):
+ """Test: Operator links should exist for serialized DAG."""
+ self.create_op_and_ti(self.link_class, dag_id="test_link_serialize", task_id=self.task_id)
+ serialized_dag = self.dag_maker.get_serialized_data()
+ operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]
+ error_message = "Operator links should exist for serialized DAG"
+ assert operator_extra_link == [{self.full_qualname: {}}], error_message
+
+ def test_empty_xcom(self):
+ """Test: Operator links should return empty string if no XCom value."""
+ ti = self.create_op_and_ti(
+ self.link_class, dag_id="test_empty_xcom", task_id=self.task_id
+ ).task_instance
+
+ serialized_dag = self.dag_maker.get_serialized_data()
+ deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+ deserialized_task = deserialized_dag.task_dict[self.task_id]
+
+ assert (
+ ti.task.get_extra_links(ti, self.link_class.name) == ""
+ ), "Operator link should only be added if job id is available in XCom"
+
+ assert (
+ deserialized_task.get_extra_links(ti, self.link_class.name) == ""
+ ), "Operator link should be empty for deserialized task with no XCom push"
+
+ @abstractmethod
+ def test_extra_link(self, **kwargs):
+ """Test: Expected URL Link."""
+ raise NotImplementedError(f"{type(self).__name__!r} should implement `test_extra_link` test")
diff --git a/tests/providers/amazon/aws/links/test_batch.py b/tests/providers/amazon/aws/links/test_batch.py
new file mode 100644
index 0000000000..23a11193ba
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_batch.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.batch import (
+ BatchJobDefinitionLink,
+ BatchJobDetailsLink,
+ BatchJobQueueLink,
+)
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestBatchJobDefinitionLink(BaseAwsLinksTestCase):
+ link_class = BatchJobDefinitionLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/batch/home"
+ "?region=eu-west-1#job-definition/detail/arn:fake:jd"
+ ),
+ region_name="eu-west-1",
+ aws_partition="aws",
+ job_definition_arn="arn:fake:jd",
+ )
+
+
+class TestBatchJobDetailsLink(BaseAwsLinksTestCase):
+ link_class = BatchJobDetailsLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url="https://console.amazonaws.cn/batch/home?region=cn-north-1#jobs/detail/fake-id",
+ region_name="cn-north-1",
+ aws_partition="aws-cn",
+ job_id="fake-id",
+ )
+
+
+class TestBatchJobQueueLink(BaseAwsLinksTestCase):
+ link_class = BatchJobQueueLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/batch/home" "?region=us-east-1#queues/detail/arn:fake:jq"
+ ),
+ region_name="us-east-1",
+ aws_partition="aws",
+ job_queue_arn="arn:fake:jq",
+ )
diff --git a/tests/providers/amazon/aws/links/test_emr.py b/tests/providers/amazon/aws/links/test_emr.py
new file mode 100644
index 0000000000..59c883362a
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_emr.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.amazon.aws.links.emr import EmrClusterLink, EmrLogsLink, get_log_uri
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestEmrClusterLink(BaseAwsLinksTestCase):
+ link_class = EmrClusterLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/emr/home" "?region=us-west-1#/clusterDetails/j-TEST-FLOW-ID"
+ ),
+ region_name="us-west-1",
+ aws_partition="aws",
+ job_flow_id="j-TEST-FLOW-ID",
+ )
+
+
+@pytest.mark.parametrize(
+ "cluster_info, expected_uri",
+ [
+ pytest.param({"Cluster": {}}, None, id="no-log-uri"),
+ pytest.param({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/", id="has-log-uri"),
+ ],
+)
+def test_get_log_uri(cluster_info, expected_uri):
+ emr_client = MagicMock()
+ emr_client.describe_cluster.return_value = cluster_info
+ assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri
+
+
+class TestEmrLogsLink(BaseAwsLinksTestCase):
+ link_class = EmrLogsLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/s3/buckets/myLogUri/" "?region=eu-west-2&prefix=j-8989898989/"
+ ),
+ region_name="eu-west-2",
+ aws_partition="aws",
+ log_uri="myLogUri/",
+ job_flow_id="j-8989898989",
+ )
+
+ @pytest.mark.parametrize(
+ "log_url_extra",
+ [
+ pytest.param({}, id="no-log-uri", marks=pytest.mark.xfail),
+ pytest.param({"log_uri": None}, id="log-uri-none"),
+ pytest.param({"log_uri": ""}, id="log-uri-empty"),
+ ],
+ )
+ def test_missing_log_url(self, log_url_extra: dict):
+ self.assert_extra_link_url(expected_url="", **log_url_extra)
diff --git a/tests/providers/amazon/aws/utils/links_test_utils.py b/tests/providers/amazon/aws/links/test_glue.py
similarity index 56%
rename from tests/providers/amazon/aws/utils/links_test_utils.py
rename to tests/providers/amazon/aws/links/test_glue.py
index 9d2d6f9502..5f929cd3e9 100644
--- a/tests/providers/amazon/aws/utils/links_test_utils.py
+++ b/tests/providers/amazon/aws/links/test_glue.py
@@ -16,11 +16,21 @@
# under the License.
from __future__ import annotations
-from tests.test_utils.mock_operators import MockOperator
+from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
-def link_test_operator(*links):
- class LinkTestOperator(MockOperator):
- operator_extra_links = tuple(c() for c in links)
+class TestGlueJobRunDetailsLink(BaseAwsLinksTestCase):
+ link_class = GlueJobRunDetailsLink
- return LinkTestOperator
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/gluestudio/home"
+ "?region=ap-southeast-2#/job/test_job_name/run/11111"
+ ),
+ region_name="ap-southeast-2",
+ aws_partition="aws",
+ job_run_id="11111",
+ job_name="test_job_name",
+ )
diff --git a/tests/providers/amazon/aws/links/test_links.py b/tests/providers/amazon/aws/links/test_links.py
deleted file mode 100644
index e0ccf86d2e..0000000000
--- a/tests/providers/amazon/aws/links/test_links.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from operator import itemgetter
-from unittest.mock import MagicMock
-
-import pytest
-
-from airflow.providers.amazon.aws.links.batch import (
- BatchJobDefinitionLink,
- BatchJobDetailsLink,
- BatchJobQueueLink,
-)
-from airflow.providers.amazon.aws.links.emr import EmrClusterLink, get_log_uri
-from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
-from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink
-from airflow.serialization.serialized_objects import SerializedDAG
-
-TASK_ID = "test_task_id"
-REGION_NAME = "eu-west-1"
-AWS_PARTITION = "aws"
-AWS_DOMAIN = "aws.amazon.com"
-
-
-def _full_qualname(cls) -> str:
- return f"{cls.__module__}.{cls.__qualname__}"
-
-
-# List AWS External Link with test cases
-# Expected list op tuple(BaseAwsLink, kwargs, expected URL)
-AWS_LINKS = [
- (
- BatchJobDefinitionLink,
- {"job_definition_arn": "arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42"},
- f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
- f"#job-definition/detail/arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42",
- ),
- (
- BatchJobDetailsLink,
- {"job_id": "00000000-0000-0000-0000-000000000000"},
- f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
- f"#jobs/detail/00000000-0000-0000-0000-000000000000",
- ),
- (
- BatchJobQueueLink,
- {"job_queue_arn": "arn:aws:batch:dummy-region:111111111111:job-queue/test-queue"},
- f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
- f"#queues/detail/arn:aws:batch:dummy-region:111111111111:job-queue/test-queue",
- ),
- (
- EmrClusterLink,
- {"job_flow_id": "j-TEST-FLOW-ID"},
- f"https://console.{AWS_DOMAIN}/emr/home?region={REGION_NAME}#/clusterDetails/j-TEST-FLOW-ID",
- ),
- (
- CloudWatchEventsLink,
- {
- "awslogs_region": "ap-southeast-2",
- "awslogs_group": "/test/logs/group",
- "awslogs_stream_name": "test/stream/d56a66bb98a14c4593defa1548686edf",
- },
- f"https://console.{AWS_DOMAIN}/cloudwatch/home?region=ap-southeast-2"
- f"#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup"
- f"/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf",
- ),
- (
- GlueJobRunDetailsLink,
- {"job_run_id": "11111", "job_name": "test_job_name"},
- f"https://console.{AWS_DOMAIN}/gluestudio/home?region={REGION_NAME}#/job/test_job_name/run/11111",
- ),
-]
-
-
-@pytest.mark.db_test
-@pytest.mark.need_serialized_dag
-class TestAwsLinks:
- @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname)
- def test_link_serialize(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link):
- """Test: Operator links should exist for serialized DAG."""
- create_task_and_ti_of_op_with_extra_link(
- extra_link_class, dag_id="test_link_serialize", task_id=TASK_ID
- )
- serialized_dag = dag_maker.get_serialized_data()
- assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [
- {f"{_full_qualname(extra_link_class)}": {}}
- ], "Operator links should exist for serialized DAG"
-
- @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname)
- def test_empty_xcom(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link):
- """Test: Operator links should return empty string if no XCom value."""
- _, ti = create_task_and_ti_of_op_with_extra_link(
- extra_link_class, dag_id="test_empty_xcom", task_id=TASK_ID
- )
-
- serialized_dag = dag_maker.get_serialized_data()
- deserialized_dag = SerializedDAG.from_dict(serialized_dag)
- deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
- assert (
- ti.task.get_extra_links(ti, extra_link_class.name) == ""
- ), "Operator link should only be added if job id is available in XCom"
-
- assert (
- deserialized_task.get_extra_links(ti, extra_link_class.name) == ""
- ), "Operator link should be empty for deserialized task with no XCom push"
-
- @pytest.mark.parametrize("extra_link_class, extra_link_kwargs, extra_link_expected_url", AWS_LINKS)
- def test_extra_link(
- self,
- extra_link_class,
- extra_link_kwargs,
- extra_link_expected_url,
- dag_maker,
- create_task_and_ti_of_op_with_extra_link,
- ):
- """Test: Expected URL Link."""
- task, ti = create_task_and_ti_of_op_with_extra_link(
- extra_link_class, dag_id="test_extra_link", task_id=TASK_ID
- )
-
- mock_context = MagicMock()
- mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__
-
- extra_link_class.persist(
- context=mock_context,
- operator=task,
- region_name=REGION_NAME,
- aws_partition=AWS_PARTITION,
- **extra_link_kwargs,
- )
-
- assert (
- ti.task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url
- ), f"{_full_qualname(extra_link_class)} should be preserved after execution"
-
- serialized_dag = dag_maker.get_serialized_data()
- deserialized_dag = SerializedDAG.from_dict(serialized_dag)
- deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
- assert (
- deserialized_task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url
- ), f"{_full_qualname(extra_link_class)} should be preserved in deserialized tasks after execution"
-
-
-@pytest.mark.parametrize(
- "cluster_info, expected_uri",
- (({"Cluster": {}}, None), ({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/")),
-)
-def test_get_log_uri(cluster_info, expected_uri):
- emr_client = MagicMock()
- emr_client.describe_cluster.return_value = cluster_info
- assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri
diff --git a/tests/providers/amazon/aws/links/test_logs.py b/tests/providers/amazon/aws/links/test_logs.py
new file mode 100644
index 0000000000..991a8bc6f0
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_logs.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestCloudWatchEventsLink(BaseAwsLinksTestCase):
+ link_class = CloudWatchEventsLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+ "https://console.aws.amazon.com/cloudwatch/home"
+ "?region=ap-southeast-2#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup"
+ "/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf"
+ ),
+ region_name="us-west-1",
+ aws_partition="aws",
+ awslogs_region="ap-southeast-2",
+ awslogs_group="/test/logs/group",
+ awslogs_stream_name="test/stream/d56a66bb98a14c4593defa1548686edf",
+ )