You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:15 UTC
[airflow] 31/37: Write action log to DB when DAG run is trigged via API (#28998)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2a8e414364304b8e57eded3c02ea565250503003
Author: Andrey Komrakov <ak...@gmail.com>
AuthorDate: Fri Feb 3 11:52:58 2023 +0300
Write action log to DB when DAG run is trigged via API (#28998)
(cherry picked from commit edc2e0b118a77c143b1a5d1eb82f1137148af633)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 10 ++++++++++
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 4 +++-
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 2b3ee16b2f..7018fcb8a8 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -56,9 +56,13 @@ from airflow.api_connexion.types import APIResponse
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.airflow_flask_app import get_airflow_app
+from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
+from airflow.www.decorators import action_logging
+
+RESOURCE_EVENT_PREFIX = "dag_run"
@security.requires_access(
@@ -281,6 +285,12 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
],
)
@provide_session
+@action_logging(
+ event=action_event_from_permission(
+ prefix=RESOURCE_EVENT_PREFIX,
+ permission=permissions.ACTION_CAN_CREATE,
+ ),
+)
def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
"""Trigger a DAG."""
dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index b645e601e9..1ea4b8d1fe 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -36,6 +36,7 @@ from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
+from tests.test_utils.www import _check_last_log
@pytest.fixture(scope="module")
@@ -1025,7 +1026,7 @@ class TestPostDagRun(TestDagRunEndpoint):
pytest.param(None, None, None, id="all-missing"),
],
)
- def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date, note):
+ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
self._create_dag("TEST_DAG_ID")
# We'll patch airflow.utils.timezone.utcnow to always return this so we
@@ -1070,6 +1071,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"run_type": "manual",
"note": note,
}
+ _check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)
def test_should_respond_400_if_a_dag_has_import_errors(self, session):
"""Test that if a dagmodel has import errors, dags won't be triggered"""