You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/22 17:50:22 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9475: Add extra links endpoint

ephraimbuddy commented on a change in pull request #9475:
URL: https://github.com/apache/airflow/pull/9475#discussion_r443728375



##########
File path: tests/api_connexion/endpoints/test_extra_link_endpoint.py
##########
@@ -15,24 +15,196 @@
 # specific language governing permissions and limitations
 # under the License.
 import unittest
+from unittest import mock
+from urllib.parse import quote
 
-import pytest
-
+from airflow import DAG
+from airflow.models import XCom, BaseOperatorLink
+from airflow.models.dagrun import DagRun
+from airflow.plugins_manager import AirflowPlugin
+from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.session import provide_session
+from airflow.utils.timezone import datetime
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from parameterized import parameterized
+from test_utils.mock_plugins import mock_plugin_manager
+from tests.test_utils.db import clear_db_runs, clear_db_xcom
 
 
 class TestGetExtraLinks(unittest.TestCase):
     @classmethod
     def setUpClass(cls) -> None:
         super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+        with mock.patch.dict('os.environ', SKIP_DAGS_PARSING='True'):
+            cls.app = app.create_app(testing=True)  # type:ignore
+
+    @provide_session
+    def setUp(self, session) -> None:
+        self.now = datetime(2020, 1, 1)
+
+        clear_db_runs()
+        clear_db_xcom()
+
+        self.dag = self._create_dag()
+        self.app.dag_bag.dags = {self.dag.dag_id: self.dag}  # pylint: disable=no-member
+        self.app.dag_bag.sync_to_db()  # pylint: disable=no-member
+
+        dr = DagRun(
+            dag_id=self.dag.dag_id,
+            run_id="TEST_DAG_RUN_ID",
+            execution_date=self.now,
+            run_type=DagRunType.MANUAL.value,
+        )
+        session.add(dr)
+        session.commit()
 
-    def setUp(self) -> None:
         self.client = self.app.test_client()  # type:ignore
 
-    @pytest.mark.skip(reason="Not implemented yet")
+    def tearDown(self) -> None:
+        super().tearDown()
+        clear_db_runs()
+        clear_db_xcom()
+
+    @staticmethod
+    def _create_dag():
+        with DAG(dag_id="TEST_DAG_ID", default_args=dict(start_date=days_ago(2),)) as dag:
+            BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", sql="SELECT 1")
+            BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", sql=["SELECT 1", "SELECT 2"])
+        return dag
+
+    @parameterized.expand(
+        [
+            (
+                "/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links",
+                'DAG not found'
+            ),
+            (
+                "/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links",
+                "DAG Run not found"
+            ),
+            (
+                "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links",
+                "Task not found"
+            ),
+        ]
+    )
+    def test_should_response_404_on_invalid_task_id(self, url, expected_title):
+        response = self.client.get(url)
+
+        self.assertEqual(404, response.status_code)
+        self.assertEqual({'detail': None, 'status': 404, 'title': expected_title, 'type': 'about:blank'}, response.json)
+
+    @mock_plugin_manager(plugins=[])
     def test_should_response_200(self):
+        XCom.set(
+            key="job_id",
+            value="TEST_JOB_ID",
+            execution_date=self.now,
+            task_id="TEST_SINGLE_QUERY",
+            dag_id=self.dag.dag_id,
+        )
+        response = self.client.get(
+            "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links"
+        )
+
+        self.assertEqual(200, response.status_code, response.data)
+        self.assertEqual(
+            {
+                "BigQuery Console": "https://console.cloud.google.com/bigquery?j=TEST_JOB_ID",
+            },
+            response.json,
+        )
+
+    @mock_plugin_manager(plugins=[])
+    def test_should_response_200_missing_xcom(self):
+        response = self.client.get(
+            "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links"
+        )
+
+        self.assertEqual(200, response.status_code, response.data)
+        self.assertEqual(
+            {
+                "BigQuery Console": None,
+            },
+            response.json,
+        )
+
+    @mock_plugin_manager(plugins=[])
+    def test_should_response_200_multiple_links(self):
+        XCom.set(
+            key="job_id",
+            value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
+            execution_date=self.now,
+            task_id="TEST_MULTIPLE_QUERY",
+            dag_id=self.dag.dag_id,
+        )
+        response = self.client.get(
+            "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MULTIPLE_QUERY/links"
+        )
+
+        self.assertEqual(200, response.status_code, response.data)
+        self.assertEqual(
+            {
+                "BigQuery Console #1": "https://console.cloud.google.com/bigquery?j=TEST_JOB_ID_1",
+                "BigQuery Console #2": "https://console.cloud.google.com/bigquery?j=TEST_JOB_ID_2",
+            },
+            response.json,
+        )
+
+    @mock_plugin_manager(plugins=[])
+    def test_should_response_200_multiple_links_missing_xcom(self):
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/links"
+            "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MULTIPLE_QUERY/links"
         )
-        assert response.status_code == 200
+
+        self.assertEqual(200, response.status_code, response.data)
+        self.assertEqual(
+            {
+                "BigQuery Console #1": None,
+                "BigQuery Console #2": None,
+            },
+            response.json,
+        )
+
+    def test_should_response_200_support_plugins(self):
+        class GoogleLink(BaseOperatorLink):
+            name = "Google"
+
+            def get_link(self, operator, dttm):
+                return "https://www.google.com"
+
+        class S3LogLink(BaseOperatorLink):
+            name = 'S3'
+            operators = [BigQueryExecuteQueryOperator]
+
+            def get_link(self, operator, dttm):
+                return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
+                    dag_id=operator.dag_id,
+                    task_id=operator.task_id,
+                    execution_date=quote(dttm.isoformat()),

Review comment:
       ```suggestion
                       execution_date=quote_plus(dttm.isoformat()),
   ```
   I am thinking that quote_plus will be better here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org