You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:15 UTC

[airflow] 31/37: Write action log to DB when DAG run is trigged via API (#28998)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2a8e414364304b8e57eded3c02ea565250503003
Author: Andrey Komrakov <ak...@gmail.com>
AuthorDate: Fri Feb 3 11:52:58 2023 +0300

    Write action log to DB when DAG run is trigged via API (#28998)
    
    (cherry picked from commit edc2e0b118a77c143b1a5d1eb82f1137148af633)
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py    | 10 ++++++++++
 tests/api_connexion/endpoints/test_dag_run_endpoint.py |  4 +++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 2b3ee16b2f..7018fcb8a8 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -56,9 +56,13 @@ from airflow.api_connexion.types import APIResponse
 from airflow.models import DagModel, DagRun
 from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
+from airflow.utils.log.action_logger import action_event_from_permission
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
+from airflow.www.decorators import action_logging
+
+RESOURCE_EVENT_PREFIX = "dag_run"
 
 
 @security.requires_access(
@@ -281,6 +285,12 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
     ],
 )
 @provide_session
+@action_logging(
+    event=action_event_from_permission(
+        prefix=RESOURCE_EVENT_PREFIX,
+        permission=permissions.ACTION_CAN_CREATE,
+    ),
+)
 def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
     """Trigger a DAG."""
     dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index b645e601e9..1ea4b8d1fe 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -36,6 +36,7 @@ from airflow.utils.types import DagRunType
 from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
+from tests.test_utils.www import _check_last_log
 
 
 @pytest.fixture(scope="module")
@@ -1025,7 +1026,7 @@ class TestPostDagRun(TestDagRunEndpoint):
             pytest.param(None, None, None, id="all-missing"),
         ],
     )
-    def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date, note):
+    def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
         self._create_dag("TEST_DAG_ID")
 
         # We'll patch airflow.utils.timezone.utcnow to always return this so we
@@ -1070,6 +1071,7 @@ class TestPostDagRun(TestDagRunEndpoint):
             "run_type": "manual",
             "note": note,
         }
+        _check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)
 
     def test_should_respond_400_if_a_dag_has_import_errors(self, session):
         """Test that if a dagmodel has import errors, dags won't be triggered"""