You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/07 16:16:25 UTC

[airflow] 18/23: POST /dagRuns API should 404 if dag not active (#29860)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 29b7bf204e41f89c8fcb51ce184a5dd01f93aa96
Author: Sam Wheating <sa...@gmail.com>
AuthorDate: Fri Mar 3 06:40:07 2023 -0800

    POST /dagRuns API should 404 if dag not active (#29860)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 751a995df55419068f11ebabe483dba3302916ed)
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py    |  2 +-
 tests/api_connexion/endpoints/test_dag_run_endpoint.py | 15 ++++++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index ea1c0af88c..b2a07f87b1 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -293,7 +293,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
 )
 def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
     """Trigger a DAG."""
-    dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
+    dm = session.query(DagModel).filter(DagModel.is_active, DagModel.dag_id == dag_id).first()
     if not dm:
         raise NotFound(title="DAG not found", detail=f"DAG with dag_id: '{dag_id}' not found")
     if dm.has_import_errors:
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 1ea4b8d1fe..93c9c6a1d0 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -120,6 +120,7 @@ class TestDagRunEndpoint:
 
     def _create_dag(self, dag_id):
         dag_instance = DagModel(dag_id=dag_id)
+        dag_instance.is_active = True
         with create_session() as session:
             session.add(dag_instance)
         dag = DAG(dag_id=dag_id, schedule=None)
@@ -132,7 +133,7 @@ class TestDagRunEndpoint:
 
         for i in range(idx_start, idx_start + 2):
             if i == 1:
-                dags.append(DagModel(dag_id="TEST_DAG_ID"))
+                dags.append(DagModel(dag_id="TEST_DAG_ID", is_active=True))
             dagrun_model = DagRun(
                 dag_id="TEST_DAG_ID",
                 run_id="TEST_DAG_RUN_ID_" + str(i),
@@ -1073,6 +1074,18 @@ class TestPostDagRun(TestDagRunEndpoint):
         }
         _check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)
 
+    def test_should_respond_404_if_a_dag_is_inactive(self, session):
+        dm = self._create_dag("TEST_INACTIVE_DAG_ID")
+        dm.is_active = False
+        session.add(dm)
+        session.flush()
+        response = self.client.post(
+            "api/v1/dags/TEST_INACTIVE_DAG_ID/dagRuns",
+            json={},
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 404
+
     def test_should_respond_400_if_a_dag_has_import_errors(self, session):
         """Test that if a dagmodel has import errors, dags won't be triggered"""
         dm = self._create_dag("TEST_DAG_ID")