You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/07 16:16:25 UTC
[airflow] 18/23: POST /dagRuns API should 404 if dag not active (#29860)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 29b7bf204e41f89c8fcb51ce184a5dd01f93aa96
Author: Sam Wheating <sa...@gmail.com>
AuthorDate: Fri Mar 3 06:40:07 2023 -0800
POST /dagRuns API should 404 if dag not active (#29860)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 751a995df55419068f11ebabe483dba3302916ed)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 2 +-
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 15 ++++++++++++++-
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index ea1c0af88c..b2a07f87b1 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -293,7 +293,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
)
def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
"""Trigger a DAG."""
- dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
+ dm = session.query(DagModel).filter(DagModel.is_active, DagModel.dag_id == dag_id).first()
if not dm:
raise NotFound(title="DAG not found", detail=f"DAG with dag_id: '{dag_id}' not found")
if dm.has_import_errors:
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 1ea4b8d1fe..93c9c6a1d0 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -120,6 +120,7 @@ class TestDagRunEndpoint:
def _create_dag(self, dag_id):
dag_instance = DagModel(dag_id=dag_id)
+ dag_instance.is_active = True
with create_session() as session:
session.add(dag_instance)
dag = DAG(dag_id=dag_id, schedule=None)
@@ -132,7 +133,7 @@ class TestDagRunEndpoint:
for i in range(idx_start, idx_start + 2):
if i == 1:
- dags.append(DagModel(dag_id="TEST_DAG_ID"))
+ dags.append(DagModel(dag_id="TEST_DAG_ID", is_active=True))
dagrun_model = DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID_" + str(i),
@@ -1073,6 +1074,18 @@ class TestPostDagRun(TestDagRunEndpoint):
}
_check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)
+ def test_should_respond_404_if_a_dag_is_inactive(self, session):
+ dm = self._create_dag("TEST_INACTIVE_DAG_ID")
+ dm.is_active = False
+ session.add(dm)
+ session.flush()
+ response = self.client.post(
+ "api/v1/dags/TEST_INACTIVE_DAG_ID/dagRuns",
+ json={},
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 404
+
def test_should_respond_400_if_a_dag_has_import_errors(self, session):
"""Test that if a dagmodel has import errors, dags won't be triggered"""
dm = self._create_dag("TEST_DAG_ID")