You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/03/03 14:40:24 UTC

[airflow] branch main updated: POST /dagRuns API should 404 if dag not active (#29860)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 751a995df5 POST /dagRuns API should 404 if dag not active (#29860)
751a995df5 is described below

commit 751a995df55419068f11ebabe483dba3302916ed
Author: Sam Wheating <sa...@gmail.com>
AuthorDate: Fri Mar 3 06:40:07 2023 -0800

    POST /dagRuns API should 404 if dag not active (#29860)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py    |  2 +-
 tests/api_connexion/endpoints/test_dag_run_endpoint.py | 15 ++++++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 46f36470e5..945a5eccb7 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -307,7 +307,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
 )
 def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
     """Trigger a DAG."""
-    dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
+    dm = session.query(DagModel).filter(DagModel.is_active, DagModel.dag_id == dag_id).first()
     if not dm:
         raise NotFound(title="DAG not found", detail=f"DAG with dag_id: '{dag_id}' not found")
     if dm.has_import_errors:
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index a7f7fe7bca..09ecb9e497 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -120,6 +120,7 @@ class TestDagRunEndpoint:
 
     def _create_dag(self, dag_id):
         dag_instance = DagModel(dag_id=dag_id)
+        dag_instance.is_active = True
         with create_session() as session:
             session.add(dag_instance)
         dag = DAG(dag_id=dag_id, schedule=None)
@@ -132,7 +133,7 @@ class TestDagRunEndpoint:
 
         for i in range(idx_start, idx_start + 2):
             if i == 1:
-                dags.append(DagModel(dag_id="TEST_DAG_ID"))
+                dags.append(DagModel(dag_id="TEST_DAG_ID", is_active=True))
             dagrun_model = DagRun(
                 dag_id="TEST_DAG_ID",
                 run_id="TEST_DAG_RUN_ID_" + str(i),
@@ -1087,6 +1088,18 @@ class TestPostDagRun(TestDagRunEndpoint):
         }
         _check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)
 
+    def test_should_respond_404_if_a_dag_is_inactive(self, session):
+        dm = self._create_dag("TEST_INACTIVE_DAG_ID")
+        dm.is_active = False
+        session.add(dm)
+        session.flush()
+        response = self.client.post(
+            "api/v1/dags/TEST_INACTIVE_DAG_ID/dagRuns",
+            json={},
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 404
+
     def test_should_respond_400_if_a_dag_has_import_errors(self, session):
         """Test that if a dagmodel has import errors, dags won't be triggered"""
         dm = self._create_dag("TEST_DAG_ID")