You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/06/14 21:49:29 UTC

[airflow] branch main updated: Adding `only_active` parameter to /dags endpoint (#14306)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9526a24  Adding `only_active` parameter to /dags endpoint (#14306)
9526a24 is described below

commit 9526a249cceb170ddfa68530fdcc786ec3e9e5c2
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jun 14 14:49:11 2021 -0700

    Adding `only_active` parameter to /dags endpoint (#14306)
    
    I noticed that the `/dags` endpoint returns information on all entries in the DAG table, which is often many more DAGs than are activeand likely includes DAGs which have been removed from Airflow.
    
    This PR adds a boolean `only_active` parameter to the `/dags` endpoint which will then only return active DAGs.
    
    I also noticed that this endpoint was hitting a deprecated codepath by dumping a `DAG` object to the DAGDetailSchema, thus hitting calling `DAG.is_paused()` I have updated the schema to call the correct function (`DAG.get_is_paused`) since I'm assuming the deprecated functions may be removed some day.
---
 airflow/api_connexion/endpoints/dag_endpoint.py    |  7 +-
 airflow/api_connexion/openapi/v1.yaml              | 11 +++
 airflow/api_connexion/schemas/dag_schema.py        | 13 ++++
 airflow/models/dag.py                              |  6 ++
 tests/api_connexion/endpoints/test_dag_endpoint.py | 84 ++++++++++++++++++++++
 tests/api_connexion/schemas/test_dag_schema.py     |  5 ++
 6 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py
index 7b19aed..ea4a1e3 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -60,9 +60,12 @@ def get_dag_details(dag_id):
 @security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
 @format_parameters({'limit': check_limit})
 @provide_session
-def get_dags(limit, session, offset=0):
+def get_dags(limit, session, offset=0, only_active=True):
     """Get all DAGs."""
-    dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
 
     readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
 
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index dbe8b8d..09d5707 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -407,6 +407,13 @@ paths:
         - $ref: '#/components/parameters/PageLimit'
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: Only return active DAGs.
       responses:
         '200':
           description: Success.
@@ -1795,6 +1802,10 @@ components:
           type: boolean
           nullable: true
           description: Whether the DAG is paused.
+        is_active:
+          type: boolean
+          nullable: true
+          description: Whether the DAG is currently seen by the scheduler(s).
         is_subdag:
           description: Whether the DAG is SubDAG.
           type: boolean
diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py
index aabd215..bc5a9ca 100644
--- a/airflow/api_connexion/schemas/dag_schema.py
+++ b/airflow/api_connexion/schemas/dag_schema.py
@@ -49,6 +49,7 @@ class DAGSchema(SQLAlchemySchema):
     dag_id = auto_field(dump_only=True)
     root_dag_id = auto_field(dump_only=True)
     is_paused = auto_field()
+    is_active = auto_field(dump_only=True)
     is_subdag = auto_field(dump_only=True)
     fileloc = auto_field(dump_only=True)
     file_token = fields.Method("get_token", dump_only=True)
@@ -85,6 +86,8 @@ class DAGDetailSchema(DAGSchema):
     default_view = fields.String()
     params = fields.Dict()
     tags = fields.Method("get_tags", dump_only=True)
+    is_paused = fields.Method("get_is_paused", dump_only=True)
+    is_active = fields.Method("get_is_active", dump_only=True)
 
     @staticmethod
     def get_tags(obj: DAG):
@@ -101,6 +104,16 @@ class DAGDetailSchema(DAGSchema):
             return []
         return obj.owner.split(",")
 
+    @staticmethod
+    def get_is_paused(obj: DAG):
+        """Checks entry in DAG table to see if this DAG is paused"""
+        return obj.get_is_paused()
+
+    @staticmethod
+    def get_is_active(obj: DAG):
+        """Checks entry in DAG table to see if this DAG is active"""
+        return obj.get_is_active()
+
 
 class DAGCollection(NamedTuple):
     """List of DAGs with metadata"""
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 027c3c3..c0469c5 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -803,6 +803,12 @@ class DAG(LoggingMixin):
         return self.get_concurrency_reached()
 
     @provide_session
+    def get_is_active(self, session=None) -> Optional[None]:
+        """Returns a boolean indicating whether this DAG is active"""
+        qry = session.query(DagModel).filter(DagModel.dag_id == self.dag_id)
+        return qry.value(DagModel.is_active)
+
+    @provide_session
     def get_is_paused(self, session=None) -> Optional[None]:
         """Returns a boolean indicating whether this DAG is paused"""
         qry = session.query(DagModel).filter(DagModel.dag_id == self.dag_id)
diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py
index f3f7501..e11ad72 100644
--- a/tests/api_connexion/endpoints/test_dag_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_endpoint.py
@@ -148,6 +148,7 @@ class TestGetDag(TestDagEndpoint):
             "fileloc": "/tmp/dag_1.py",
             "file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
             "is_paused": False,
+            "is_active": True,
             "is_subdag": False,
             "owners": [],
             "root_dag_id": None,
@@ -172,6 +173,7 @@ class TestGetDag(TestDagEndpoint):
             "fileloc": "/tmp/dag_1.py",
             "file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
             "is_paused": False,
+            "is_active": False,
             "is_subdag": False,
             "owners": [],
             "root_dag_id": None,
@@ -228,6 +230,7 @@ class TestGetDagDetails(TestDagEndpoint):
             "fileloc": __file__,
             "file_token": FILE_TOKEN,
             "is_paused": None,
+            "is_active": None,
             "is_subdag": False,
             "orientation": "LR",
             "owners": ['airflow'],
@@ -260,6 +263,7 @@ class TestGetDagDetails(TestDagEndpoint):
             "fileloc": __file__,
             "file_token": FILE_TOKEN,
             "is_paused": None,
+            "is_active": None,
             "is_subdag": False,
             "orientation": "LR",
             "owners": ['airflow'],
@@ -292,6 +296,7 @@ class TestGetDagDetails(TestDagEndpoint):
             "fileloc": __file__,
             "file_token": FILE_TOKEN,
             "is_paused": None,
+            "is_active": None,
             "is_subdag": False,
             "orientation": "LR",
             "owners": ['airflow'],
@@ -328,6 +333,7 @@ class TestGetDagDetails(TestDagEndpoint):
             "fileloc": __file__,
             "file_token": FILE_TOKEN,
             "is_paused": None,
+            "is_active": None,
             "is_subdag": False,
             "orientation": "LR",
             "owners": ['airflow'],
@@ -366,6 +372,7 @@ class TestGetDagDetails(TestDagEndpoint):
             'fileloc': __file__,
             "file_token": FILE_TOKEN,
             'is_paused': None,
+            "is_active": None,
             'is_subdag': False,
             'orientation': 'LR',
             'owners': ['airflow'],
@@ -417,6 +424,7 @@ class TestGetDags(TestDagEndpoint):
                     "fileloc": "/tmp/dag_1.py",
                     "file_token": file_token,
                     "is_paused": False,
+                    "is_active": True,
                     "is_subdag": False,
                     "owners": [],
                     "root_dag_id": None,
@@ -432,6 +440,80 @@ class TestGetDags(TestDagEndpoint):
                     "fileloc": "/tmp/dag_2.py",
                     "file_token": file_token2,
                     "is_paused": False,
+                    "is_active": True,
+                    "is_subdag": False,
+                    "owners": [],
+                    "root_dag_id": None,
+                    "schedule_interval": {
+                        "__type": "CronExpression",
+                        "value": "2 2 * * *",
+                    },
+                    "tags": [],
+                },
+            ],
+            "total_entries": 2,
+        } == response.json
+
+    def test_only_active_true_returns_active_dags(self):
+        self._create_dag_models(1)
+        self._create_deactivated_dag()
+        response = self.client.get("api/v1/dags?only_active=True", environ_overrides={'REMOTE_USER': "test"})
+        file_token = SERIALIZER.dumps("/tmp/dag_1.py")
+        assert response.status_code == 200
+        assert {
+            "dags": [
+                {
+                    "dag_id": "TEST_DAG_1",
+                    "description": None,
+                    "fileloc": "/tmp/dag_1.py",
+                    "file_token": file_token,
+                    "is_paused": False,
+                    "is_active": True,
+                    "is_subdag": False,
+                    "owners": [],
+                    "root_dag_id": None,
+                    "schedule_interval": {
+                        "__type": "CronExpression",
+                        "value": "2 2 * * *",
+                    },
+                    "tags": [],
+                }
+            ],
+            "total_entries": 1,
+        } == response.json
+
+    def test_only_active_false_returns_all_dags(self):
+        self._create_dag_models(1)
+        self._create_deactivated_dag()
+        response = self.client.get("api/v1/dags?only_active=False", environ_overrides={'REMOTE_USER': "test"})
+        file_token = SERIALIZER.dumps("/tmp/dag_1.py")
+        file_token_2 = SERIALIZER.dumps("/tmp/dag_del_1.py")
+        assert response.status_code == 200
+        assert {
+            "dags": [
+                {
+                    "dag_id": "TEST_DAG_1",
+                    "description": None,
+                    "fileloc": "/tmp/dag_1.py",
+                    "file_token": file_token,
+                    "is_paused": False,
+                    "is_active": True,
+                    "is_subdag": False,
+                    "owners": [],
+                    "root_dag_id": None,
+                    "schedule_interval": {
+                        "__type": "CronExpression",
+                        "value": "2 2 * * *",
+                    },
+                    "tags": [],
+                },
+                {
+                    "dag_id": "TEST_DAG_DELETED_1",
+                    "description": None,
+                    "fileloc": "/tmp/dag_del_1.py",
+                    "file_token": file_token_2,
+                    "is_paused": False,
+                    "is_active": False,
                     "is_subdag": False,
                     "owners": [],
                     "root_dag_id": None,
@@ -538,6 +620,7 @@ class TestPatchDag(TestDagEndpoint):
             "fileloc": "/tmp/dag_1.py",
             "file_token": self.file_token,
             "is_paused": False,
+            "is_active": False,
             "is_subdag": False,
             "owners": [],
             "root_dag_id": None,
@@ -619,6 +702,7 @@ class TestPatchDag(TestDagEndpoint):
             "fileloc": "/tmp/dag_1.py",
             "file_token": self.file_token,
             "is_paused": False,
+            "is_active": False,
             "is_subdag": False,
             "owners": [],
             "root_dag_id": None,
diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py
index 96aba1f..4fd8a52 100644
--- a/tests/api_connexion/schemas/test_dag_schema.py
+++ b/tests/api_connexion/schemas/test_dag_schema.py
@@ -39,6 +39,7 @@ class TestDagSchema(unittest.TestCase):
             dag_id="test_dag_id",
             root_dag_id="test_root_dag_id",
             is_paused=True,
+            is_active=True,
             is_subdag=False,
             fileloc="/root/airflow/dags/my_dag.py",
             owners="airflow1,airflow2",
@@ -53,6 +54,7 @@ class TestDagSchema(unittest.TestCase):
             "fileloc": "/root/airflow/dags/my_dag.py",
             "file_token": SERIALIZER.dumps("/root/airflow/dags/my_dag.py"),
             "is_paused": True,
+            "is_active": True,
             "is_subdag": False,
             "owners": ["airflow1", "airflow2"],
             "root_dag_id": "test_root_dag_id",
@@ -76,6 +78,7 @@ class TestDAGCollectionSchema(unittest.TestCase):
                     "file_token": SERIALIZER.dumps("/tmp/a.py"),
                     "is_paused": None,
                     "is_subdag": None,
+                    "is_active": None,
                     "owners": [],
                     "root_dag_id": None,
                     "schedule_interval": None,
@@ -86,6 +89,7 @@ class TestDAGCollectionSchema(unittest.TestCase):
                     "description": None,
                     "fileloc": "/tmp/a.py",
                     "file_token": SERIALIZER.dumps("/tmp/a.py"),
+                    "is_active": None,
                     "is_paused": None,
                     "is_subdag": None,
                     "owners": [],
@@ -120,6 +124,7 @@ class TestDAGDetailSchema:
             'doc_md': 'docs',
             'fileloc': __file__,
             "file_token": SERIALIZER.dumps(__file__),
+            "is_active": None,
             'is_paused': None,
             'is_subdag': False,
             'orientation': 'LR',