You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/11/07 13:46:51 UTC

(airflow) branch main updated: Split AWS Links tests into separate modules (#35484)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b1ee7244f6 Split AWS Links tests into separate modules (#35484)
b1ee7244f6 is described below

commit b1ee7244f68ed9a7d855c56f2ed8b5cbaa28c834
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Tue Nov 7 17:46:42 2023 +0400

    Split AWS Links tests into separate modules (#35484)
    
    * Split AWS Links tests into separate modules
    
    * Add Glue Link(s) tests
---
 tests/always/test_project_structure.py             |   5 -
 tests/providers/amazon/aws/links/conftest.py       |  50 -----
 tests/providers/amazon/aws/links/test_base.py      |  84 --------
 tests/providers/amazon/aws/links/test_base_aws.py  | 215 +++++++++++++++++++++
 tests/providers/amazon/aws/links/test_batch.py     |  65 +++++++
 tests/providers/amazon/aws/links/test_emr.py       |  77 ++++++++
 .../links_test_utils.py => links/test_glue.py}     |  20 +-
 tests/providers/amazon/aws/links/test_links.py     | 167 ----------------
 tests/providers/amazon/aws/links/test_logs.py      |  38 ++++
 9 files changed, 410 insertions(+), 311 deletions(-)

diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py
index b2e5dc1c9d..9976a9950d 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -65,11 +65,6 @@ class TestProjectStructure:
             "tests/providers/amazon/aws/executors/ecs/test_utils.py",
             "tests/providers/amazon/aws/fs/test_s3.py",
             "tests/providers/amazon/aws/hooks/test_dms.py",
-            "tests/providers/amazon/aws/links/test_base_aws.py",
-            "tests/providers/amazon/aws/links/test_batch.py",
-            "tests/providers/amazon/aws/links/test_emr.py",
-            "tests/providers/amazon/aws/links/test_glue.py",
-            "tests/providers/amazon/aws/links/test_logs.py",
             "tests/providers/amazon/aws/operators/test_dms.py",
             "tests/providers/amazon/aws/operators/test_emr.py",
             "tests/providers/amazon/aws/operators/test_sagemaker.py",
diff --git a/tests/providers/amazon/aws/links/conftest.py b/tests/providers/amazon/aws/links/conftest.py
deleted file mode 100644
index b2b9a4188c..0000000000
--- a/tests/providers/amazon/aws/links/conftest.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-import pytest
-
-from tests.providers.amazon.aws.utils.links_test_utils import link_test_operator
-
-if TYPE_CHECKING:
-    from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
-
-
-@pytest.fixture()
-def create_task_and_ti_of_op_with_extra_link(create_task_instance_of_operator):
-    def _create_op_and_ti(
-        extra_link_class: BaseAwsLink,
-        *,
-        dag_id,
-        task_id,
-        execution_date=None,
-        session=None,
-        **operator_kwargs,
-    ):
-        op = link_test_operator(extra_link_class)
-        return op(task_id=task_id), create_task_instance_of_operator(
-            link_test_operator(extra_link_class),
-            dag_id=dag_id,
-            task_id=task_id,
-            execution_date=execution_date,
-            session=session,
-            **operator_kwargs,
-        )
-
-    return _create_op_and_ti
diff --git a/tests/providers/amazon/aws/links/test_base.py b/tests/providers/amazon/aws/links/test_base.py
deleted file mode 100644
index 42dabde810..0000000000
--- a/tests/providers/amazon/aws/links/test_base.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest.mock import MagicMock
-
-import pytest
-
-from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
-from tests.test_utils.mock_operators import MockOperator
-
-XCOM_KEY = "test_xcom_key"
-CUSTOM_KEYS = {
-    "foo": "bar",
-    "spam": "egg",
-}
-
-
-class SimpleBaseAwsLink(BaseAwsLink):
-    key = XCOM_KEY
-
-
-class TestBaseAwsLink:
-    @pytest.mark.parametrize(
-        "region_name, aws_partition,keywords,expected_value",
-        [
-            ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}),
-            ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}),
-            (
-                "us-gov-east-1",
-                "aws-us-gov",
-                {},
-                {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"},
-            ),
-            (
-                "eu-west-1",
-                "aws",
-                CUSTOM_KEYS,
-                {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS},
-            ),
-        ],
-    )
-    def test_persist(self, region_name, aws_partition, keywords, expected_value):
-        mock_context = MagicMock()
-
-        SimpleBaseAwsLink.persist(
-            context=mock_context,
-            operator=MockOperator(task_id="test_task_id"),
-            region_name=region_name,
-            aws_partition=aws_partition,
-            **keywords,
-        )
-
-        ti = mock_context["ti"]
-        ti.xcom_push.assert_called_once_with(
-            execution_date=None,
-            key=XCOM_KEY,
-            value=expected_value,
-        )
-
-    def test_disable_xcom_push(self):
-        mock_context = MagicMock()
-        SimpleBaseAwsLink.persist(
-            context=mock_context,
-            operator=MockOperator(task_id="test_task_id", do_xcom_push=False),
-            region_name="eu-east-1",
-            aws_partition="aws",
-        )
-        ti = mock_context["ti"]
-        ti.xcom_push.assert_not_called()
diff --git a/tests/providers/amazon/aws/links/test_base_aws.py b/tests/providers/amazon/aws/links/test_base_aws.py
new file mode 100644
index 0000000000..a8bf17c3db
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_base_aws.py
@@ -0,0 +1,215 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import abstractmethod
+from typing import TYPE_CHECKING, NamedTuple
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.amazon.aws.links.base_aws import BaseAwsLink
+from airflow.serialization.serialized_objects import SerializedDAG
+from tests.test_utils.mock_operators import MockOperator
+
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
+XCOM_KEY = "test_xcom_key"
+CUSTOM_KEYS = {
+    "foo": "bar",
+    "spam": "egg",
+}
+TEST_REGION_NAME = "eu-west-1"
+TEST_AWS_PARTITION = "aws"
+
+
+class SimpleBaseAwsLink(BaseAwsLink):
+    key = XCOM_KEY
+
+
+class TestBaseAwsLink:
+    @pytest.mark.parametrize(
+        "region_name, aws_partition,keywords,expected_value",
+        [
+            ("eu-central-1", "aws", {}, {"region_name": "eu-central-1", "aws_domain": "aws.amazon.com"}),
+            ("cn-north-1", "aws-cn", {}, {"region_name": "cn-north-1", "aws_domain": "amazonaws.cn"}),
+            (
+                "us-gov-east-1",
+                "aws-us-gov",
+                {},
+                {"region_name": "us-gov-east-1", "aws_domain": "amazonaws-us-gov.com"},
+            ),
+            (
+                "eu-west-1",
+                "aws",
+                CUSTOM_KEYS,
+                {"region_name": "eu-west-1", "aws_domain": "aws.amazon.com", **CUSTOM_KEYS},
+            ),
+        ],
+    )
+    def test_persist(self, region_name, aws_partition, keywords, expected_value):
+        mock_context = MagicMock()
+
+        SimpleBaseAwsLink.persist(
+            context=mock_context,
+            operator=MockOperator(task_id="test_task_id"),
+            region_name=region_name,
+            aws_partition=aws_partition,
+            **keywords,
+        )
+
+        ti = mock_context["ti"]
+        ti.xcom_push.assert_called_once_with(
+            execution_date=None,
+            key=XCOM_KEY,
+            value=expected_value,
+        )
+
+    def test_disable_xcom_push(self):
+        mock_context = MagicMock()
+        SimpleBaseAwsLink.persist(
+            context=mock_context,
+            operator=MockOperator(task_id="test_task_id", do_xcom_push=False),
+            region_name="eu-east-1",
+            aws_partition="aws",
+        )
+        ti = mock_context["ti"]
+        ti.xcom_push.assert_not_called()
+
+
+def link_test_operator(*links):
+    """Helper for create mock operator class with extra links"""
+
+    class LinkTestOperator(MockOperator):
+        operator_extra_links = tuple(c() for c in links)
+
+    return LinkTestOperator
+
+
+class OperatorAndTi(NamedTuple):
+    """Helper container for store task and generated task instance."""
+
+    task: MockOperator
+    task_instance: TaskInstance
+
+
+@pytest.mark.db_test
+@pytest.mark.need_serialized_dag
+class BaseAwsLinksTestCase:
+    """Base class for AWS Provider links tests."""
+
+    link_class: type[BaseAwsLink]
+
+    @pytest.fixture(autouse=True)
+    def setup_base_test_case(self, dag_maker, create_task_instance_of_operator):
+        self.dag_maker = dag_maker
+        self.ti_maker = create_task_instance_of_operator
+
+    @property
+    def full_qualname(self) -> str:
+        return f"{self.link_class.__module__}.{self.link_class.__qualname__}"
+
+    @property
+    def task_id(self) -> str:
+        return f"test-{self.link_class.__name__}"
+
+    def create_op_and_ti(
+        self,
+        extra_link_class: type[BaseAwsLink],
+        *,
+        dag_id,
+        task_id,
+        execution_date=None,
+        session=None,
+        **operator_kwargs,
+    ):
+        """Helper method for generate operator and task instance"""
+        op = link_test_operator(extra_link_class)
+        return OperatorAndTi(
+            task=op(task_id=task_id),
+            task_instance=self.ti_maker(
+                op,
+                dag_id=dag_id,
+                task_id=task_id,
+                execution_date=execution_date,
+                session=session,
+                **operator_kwargs,
+            ),
+        )
+
+    def assert_extra_link_url(
+        self,
+        expected_url: str,
+        region_name=TEST_REGION_NAME,
+        aws_partition=TEST_AWS_PARTITION,
+        **extra_link_kwargs,
+    ):
+        """Helper method for create extra link URL from the parameters."""
+        task, ti = self.create_op_and_ti(self.link_class, dag_id="test_extra_link", task_id=self.task_id)
+
+        mock_context = MagicMock()
+        mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__
+
+        self.link_class.persist(
+            context=mock_context,
+            operator=task,
+            region_name=region_name,
+            aws_partition=aws_partition,
+            **extra_link_kwargs,
+        )
+
+        error_msg = f"{self.full_qualname!r} should be preserved after execution"
+        assert ti.task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg
+
+        serialized_dag = self.dag_maker.get_serialized_data()
+        deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+        deserialized_task = deserialized_dag.task_dict[self.task_id]
+
+        error_msg = f"{self.full_qualname!r} should be preserved in deserialized tasks after execution"
+        assert deserialized_task.get_extra_links(ti, self.link_class.name) == expected_url, error_msg
+
+    def test_link_serialize(self):
+        """Test: Operator links should exist for serialized DAG."""
+        self.create_op_and_ti(self.link_class, dag_id="test_link_serialize", task_id=self.task_id)
+        serialized_dag = self.dag_maker.get_serialized_data()
+        operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]
+        error_message = "Operator links should exist for serialized DAG"
+        assert operator_extra_link == [{self.full_qualname: {}}], error_message
+
+    def test_empty_xcom(self):
+        """Test: Operator links should return empty string if no XCom value."""
+        ti = self.create_op_and_ti(
+            self.link_class, dag_id="test_empty_xcom", task_id=self.task_id
+        ).task_instance
+
+        serialized_dag = self.dag_maker.get_serialized_data()
+        deserialized_dag = SerializedDAG.from_dict(serialized_dag)
+        deserialized_task = deserialized_dag.task_dict[self.task_id]
+
+        assert (
+            ti.task.get_extra_links(ti, self.link_class.name) == ""
+        ), "Operator link should only be added if job id is available in XCom"
+
+        assert (
+            deserialized_task.get_extra_links(ti, self.link_class.name) == ""
+        ), "Operator link should be empty for deserialized task with no XCom push"
+
+    @abstractmethod
+    def test_extra_link(self, **kwargs):
+        """Test: Expected URL Link."""
+        raise NotImplementedError(f"{type(self).__name__!r} should implement `test_extra_link` test")
diff --git a/tests/providers/amazon/aws/links/test_batch.py b/tests/providers/amazon/aws/links/test_batch.py
new file mode 100644
index 0000000000..23a11193ba
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_batch.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.batch import (
+    BatchJobDefinitionLink,
+    BatchJobDetailsLink,
+    BatchJobQueueLink,
+)
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestBatchJobDefinitionLink(BaseAwsLinksTestCase):
+    link_class = BatchJobDefinitionLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/batch/home"
+                "?region=eu-west-1#job-definition/detail/arn:fake:jd"
+            ),
+            region_name="eu-west-1",
+            aws_partition="aws",
+            job_definition_arn="arn:fake:jd",
+        )
+
+
+class TestBatchJobDetailsLink(BaseAwsLinksTestCase):
+    link_class = BatchJobDetailsLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url="https://console.amazonaws.cn/batch/home?region=cn-north-1#jobs/detail/fake-id",
+            region_name="cn-north-1",
+            aws_partition="aws-cn",
+            job_id="fake-id",
+        )
+
+
+class TestBatchJobQueueLink(BaseAwsLinksTestCase):
+    link_class = BatchJobQueueLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/batch/home" "?region=us-east-1#queues/detail/arn:fake:jq"
+            ),
+            region_name="us-east-1",
+            aws_partition="aws",
+            job_queue_arn="arn:fake:jq",
+        )
diff --git a/tests/providers/amazon/aws/links/test_emr.py b/tests/providers/amazon/aws/links/test_emr.py
new file mode 100644
index 0000000000..59c883362a
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_emr.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.amazon.aws.links.emr import EmrClusterLink, EmrLogsLink, get_log_uri
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestEmrClusterLink(BaseAwsLinksTestCase):
+    link_class = EmrClusterLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/emr/home" "?region=us-west-1#/clusterDetails/j-TEST-FLOW-ID"
+            ),
+            region_name="us-west-1",
+            aws_partition="aws",
+            job_flow_id="j-TEST-FLOW-ID",
+        )
+
+
+@pytest.mark.parametrize(
+    "cluster_info, expected_uri",
+    [
+        pytest.param({"Cluster": {}}, None, id="no-log-uri"),
+        pytest.param({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/", id="has-log-uri"),
+    ],
+)
+def test_get_log_uri(cluster_info, expected_uri):
+    emr_client = MagicMock()
+    emr_client.describe_cluster.return_value = cluster_info
+    assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri
+
+
+class TestEmrLogsLink(BaseAwsLinksTestCase):
+    link_class = EmrLogsLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/s3/buckets/myLogUri/" "?region=eu-west-2&prefix=j-8989898989/"
+            ),
+            region_name="eu-west-2",
+            aws_partition="aws",
+            log_uri="myLogUri/",
+            job_flow_id="j-8989898989",
+        )
+
+    @pytest.mark.parametrize(
+        "log_url_extra",
+        [
+            pytest.param({}, id="no-log-uri", marks=pytest.mark.xfail),
+            pytest.param({"log_uri": None}, id="log-uri-none"),
+            pytest.param({"log_uri": ""}, id="log-uri-empty"),
+        ],
+    )
+    def test_missing_log_url(self, log_url_extra: dict):
+        self.assert_extra_link_url(expected_url="", **log_url_extra)
diff --git a/tests/providers/amazon/aws/utils/links_test_utils.py b/tests/providers/amazon/aws/links/test_glue.py
similarity index 56%
rename from tests/providers/amazon/aws/utils/links_test_utils.py
rename to tests/providers/amazon/aws/links/test_glue.py
index 9d2d6f9502..5f929cd3e9 100644
--- a/tests/providers/amazon/aws/utils/links_test_utils.py
+++ b/tests/providers/amazon/aws/links/test_glue.py
@@ -16,11 +16,21 @@
 # under the License.
 from __future__ import annotations
 
-from tests.test_utils.mock_operators import MockOperator
+from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
 
 
-def link_test_operator(*links):
-    class LinkTestOperator(MockOperator):
-        operator_extra_links = tuple(c() for c in links)
+class TestGlueJobRunDetailsLink(BaseAwsLinksTestCase):
+    link_class = GlueJobRunDetailsLink
 
-    return LinkTestOperator
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/gluestudio/home"
+                "?region=ap-southeast-2#/job/test_job_name/run/11111"
+            ),
+            region_name="ap-southeast-2",
+            aws_partition="aws",
+            job_run_id="11111",
+            job_name="test_job_name",
+        )
diff --git a/tests/providers/amazon/aws/links/test_links.py b/tests/providers/amazon/aws/links/test_links.py
deleted file mode 100644
index e0ccf86d2e..0000000000
--- a/tests/providers/amazon/aws/links/test_links.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from operator import itemgetter
-from unittest.mock import MagicMock
-
-import pytest
-
-from airflow.providers.amazon.aws.links.batch import (
-    BatchJobDefinitionLink,
-    BatchJobDetailsLink,
-    BatchJobQueueLink,
-)
-from airflow.providers.amazon.aws.links.emr import EmrClusterLink, get_log_uri
-from airflow.providers.amazon.aws.links.glue import GlueJobRunDetailsLink
-from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink
-from airflow.serialization.serialized_objects import SerializedDAG
-
-TASK_ID = "test_task_id"
-REGION_NAME = "eu-west-1"
-AWS_PARTITION = "aws"
-AWS_DOMAIN = "aws.amazon.com"
-
-
-def _full_qualname(cls) -> str:
-    return f"{cls.__module__}.{cls.__qualname__}"
-
-
-# List AWS External Link with test cases
-# Expected list op tuple(BaseAwsLink, kwargs, expected URL)
-AWS_LINKS = [
-    (
-        BatchJobDefinitionLink,
-        {"job_definition_arn": "arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42"},
-        f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
-        f"#job-definition/detail/arn:aws:batch:dummy-region:111111111111:job-definition/batch-test:42",
-    ),
-    (
-        BatchJobDetailsLink,
-        {"job_id": "00000000-0000-0000-0000-000000000000"},
-        f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
-        f"#jobs/detail/00000000-0000-0000-0000-000000000000",
-    ),
-    (
-        BatchJobQueueLink,
-        {"job_queue_arn": "arn:aws:batch:dummy-region:111111111111:job-queue/test-queue"},
-        f"https://console.{AWS_DOMAIN}/batch/home?region={REGION_NAME}"
-        f"#queues/detail/arn:aws:batch:dummy-region:111111111111:job-queue/test-queue",
-    ),
-    (
-        EmrClusterLink,
-        {"job_flow_id": "j-TEST-FLOW-ID"},
-        f"https://console.{AWS_DOMAIN}/emr/home?region={REGION_NAME}#/clusterDetails/j-TEST-FLOW-ID",
-    ),
-    (
-        CloudWatchEventsLink,
-        {
-            "awslogs_region": "ap-southeast-2",
-            "awslogs_group": "/test/logs/group",
-            "awslogs_stream_name": "test/stream/d56a66bb98a14c4593defa1548686edf",
-        },
-        f"https://console.{AWS_DOMAIN}/cloudwatch/home?region=ap-southeast-2"
-        f"#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup"
-        f"/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf",
-    ),
-    (
-        GlueJobRunDetailsLink,
-        {"job_run_id": "11111", "job_name": "test_job_name"},
-        f"https://console.{AWS_DOMAIN}/gluestudio/home?region={REGION_NAME}#/job/test_job_name/run/11111",
-    ),
-]
-
-
-@pytest.mark.db_test
-@pytest.mark.need_serialized_dag
-class TestAwsLinks:
-    @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname)
-    def test_link_serialize(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link):
-        """Test: Operator links should exist for serialized DAG."""
-        create_task_and_ti_of_op_with_extra_link(
-            extra_link_class, dag_id="test_link_serialize", task_id=TASK_ID
-        )
-        serialized_dag = dag_maker.get_serialized_data()
-        assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [
-            {f"{_full_qualname(extra_link_class)}": {}}
-        ], "Operator links should exist for serialized DAG"
-
-    @pytest.mark.parametrize("extra_link_class", map(itemgetter(0), AWS_LINKS), ids=_full_qualname)
-    def test_empty_xcom(self, extra_link_class, dag_maker, create_task_and_ti_of_op_with_extra_link):
-        """Test: Operator links should return empty string if no XCom value."""
-        _, ti = create_task_and_ti_of_op_with_extra_link(
-            extra_link_class, dag_id="test_empty_xcom", task_id=TASK_ID
-        )
-
-        serialized_dag = dag_maker.get_serialized_data()
-        deserialized_dag = SerializedDAG.from_dict(serialized_dag)
-        deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
-        assert (
-            ti.task.get_extra_links(ti, extra_link_class.name) == ""
-        ), "Operator link should only be added if job id is available in XCom"
-
-        assert (
-            deserialized_task.get_extra_links(ti, extra_link_class.name) == ""
-        ), "Operator link should be empty for deserialized task with no XCom push"
-
-    @pytest.mark.parametrize("extra_link_class, extra_link_kwargs, extra_link_expected_url", AWS_LINKS)
-    def test_extra_link(
-        self,
-        extra_link_class,
-        extra_link_kwargs,
-        extra_link_expected_url,
-        dag_maker,
-        create_task_and_ti_of_op_with_extra_link,
-    ):
-        """Test: Expected URL Link."""
-        task, ti = create_task_and_ti_of_op_with_extra_link(
-            extra_link_class, dag_id="test_extra_link", task_id=TASK_ID
-        )
-
-        mock_context = MagicMock()
-        mock_context.__getitem__.side_effect = {"ti": ti}.__getitem__
-
-        extra_link_class.persist(
-            context=mock_context,
-            operator=task,
-            region_name=REGION_NAME,
-            aws_partition=AWS_PARTITION,
-            **extra_link_kwargs,
-        )
-
-        assert (
-            ti.task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url
-        ), f"{_full_qualname(extra_link_class)} should be preserved after execution"
-
-        serialized_dag = dag_maker.get_serialized_data()
-        deserialized_dag = SerializedDAG.from_dict(serialized_dag)
-        deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
-        assert (
-            deserialized_task.get_extra_links(ti, extra_link_class.name) == extra_link_expected_url
-        ), f"{_full_qualname(extra_link_class)} should be preserved in deserialized tasks after execution"
-
-
-@pytest.mark.parametrize(
-    "cluster_info, expected_uri",
-    (({"Cluster": {}}, None), ({"Cluster": {"LogUri": "s3://myLogUri/"}}, "myLogUri/")),
-)
-def test_get_log_uri(cluster_info, expected_uri):
-    emr_client = MagicMock()
-    emr_client.describe_cluster.return_value = cluster_info
-    assert get_log_uri(cluster=None, emr_client=emr_client, job_flow_id="test_job_flow_id") == expected_uri
diff --git a/tests/providers/amazon/aws/links/test_logs.py b/tests/providers/amazon/aws/links/test_logs.py
new file mode 100644
index 0000000000..991a8bc6f0
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_logs.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.logs import CloudWatchEventsLink
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestCloudWatchEventsLink(BaseAwsLinksTestCase):
+    link_class = CloudWatchEventsLink
+
+    def test_extra_link(self):
+        self.assert_extra_link_url(
+            expected_url=(
+                "https://console.aws.amazon.com/cloudwatch/home"
+                "?region=ap-southeast-2#logsV2:log-groups/log-group/%2Ftest%2Flogs%2Fgroup"
+                "/log-events/test%2Fstream%2Fd56a66bb98a14c4593defa1548686edf"
+            ),
+            region_name="us-west-1",
+            aws_partition="aws",
+            awslogs_region="ap-southeast-2",
+            awslogs_group="/test/logs/group",
+            awslogs_stream_name="test/stream/d56a66bb98a14c4593defa1548686edf",
+        )