You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/20 16:56:50 UTC

[GitHub] [airflow] ms32035 commented on a change in pull request #13199: Create dag dependencies view

ms32035 commented on a change in pull request #13199:
URL: https://github.com/apache/airflow/pull/13199#discussion_r598124058



##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []

Review comment:
       https://github.com/pallets/flask/issues/2520 - assume that's how flask still works, so it's to prevent recalculation on every request. Open to other suggestions where to store the dependencies state in memory

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if datetime.utcnow() > self.last_refresh + timedelta(seconds=self.refresh_interval):
+            self._calculate_graph()
+            self.last_refresh = datetime.utcnow()
+
+        return self.render_template(
+            "airflow/dag_dependencies.html",
+            title=title,
+            nodes=self.nodes,
+            edges=self.edges,
+            last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+            arrange=conf.get("webserver", "dag_orientation"),
+            width=request.args.get("width", "100%"),
+            height=request.args.get("height", "800"),
+        )
+
+    def _calculate_graph(self):
+
+        current_app.dag_bag.collect_dags_from_db()
+
+        nodes = {}
+        edges = []
+
+        for dag_id, dag in current_app.dag_bag.dags.items():
+            dag_node_id = f"d:{dag_id}"
+            nodes[dag_node_id] = self._node_dict(dag_node_id, dag_id, "fill: rgb(232, 247, 228)")
+
+            for task in dag.tasks:
+                task_node_id = f"t:{dag_id}:{task.task_id}"
+                if task.task_type in conf.getlist("core", "dag_dependencies_trigger"):
+                    nodes[task_node_id] = self._node_dict(
+                        task_node_id, task.task_id, "fill: rgb(255, 239, 235)"
+                    )
+
+                    edges.extend(
+                        [
+                            {"u": dag_node_id, "v": task_node_id},
+                            {"u": task_node_id, "v": f"d:{task.trigger_dag_id}"},
+                        ]
+                    )
+                elif task.task_type in conf.getlist("core", "dag_dependencies_sensor"):
+                    nodes[task_node_id] = self._node_dict(
+                        task_node_id, task.task_id, "fill: rgb(230, 241, 242)"
+                    )
+
+                    edges.extend(
+                        [
+                            {"u": task_node_id, "v": dag_node_id},
+                            {"u": f"d:{task.external_dag_id}", "v": task_node_id},
+                        ]
+                    )
+
+            implicit = getattr(dag, "implicit_dependencies", None)

Review comment:
       for now removing though

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if datetime.utcnow() > self.last_refresh + timedelta(seconds=self.refresh_interval):
+            self._calculate_graph()
+            self.last_refresh = datetime.utcnow()
+
+        return self.render_template(
+            "airflow/dag_dependencies.html",
+            title=title,
+            nodes=self.nodes,
+            edges=self.edges,
+            last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+            arrange=conf.get("webserver", "dag_orientation"),
+            width=request.args.get("width", "100%"),
+            height=request.args.get("height", "800"),
+        )
+
+    def _calculate_graph(self):
+
+        current_app.dag_bag.collect_dags_from_db()
+
+        nodes = {}
+        edges = []
+
+        for dag_id, dag in current_app.dag_bag.dags.items():
+            dag_node_id = f"d:{dag_id}"
+            nodes[dag_node_id] = self._node_dict(dag_node_id, dag_id, "fill: rgb(232, 247, 228)")

Review comment:
       will change

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if datetime.utcnow() > self.last_refresh + timedelta(seconds=self.refresh_interval):

Review comment:
       aware of this, it's the best/easiest solution that came to my mind. other recommendations are welcome

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",

Review comment:
       will change

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if datetime.utcnow() > self.last_refresh + timedelta(seconds=self.refresh_interval):
+            self._calculate_graph()
+            self.last_refresh = datetime.utcnow()
+
+        return self.render_template(
+            "airflow/dag_dependencies.html",
+            title=title,
+            nodes=self.nodes,
+            edges=self.edges,
+            last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+            arrange=conf.get("webserver", "dag_orientation"),
+            width=request.args.get("width", "100%"),
+            height=request.args.get("height", "800"),
+        )
+
+    def _calculate_graph(self):
+
+        current_app.dag_bag.collect_dags_from_db()
+
+        nodes = {}
+        edges = []
+
+        for dag_id, dag in current_app.dag_bag.dags.items():
+            dag_node_id = f"d:{dag_id}"
+            nodes[dag_node_id] = self._node_dict(dag_node_id, dag_id, "fill: rgb(232, 247, 228)")
+
+            for task in dag.tasks:
+                task_node_id = f"t:{dag_id}:{task.task_id}"
+                if task.task_type in conf.getlist("core", "dag_dependencies_trigger"):
+                    nodes[task_node_id] = self._node_dict(
+                        task_node_id, task.task_id, "fill: rgb(255, 239, 235)"
+                    )
+
+                    edges.extend(
+                        [
+                            {"u": dag_node_id, "v": task_node_id},
+                            {"u": task_node_id, "v": f"d:{task.trigger_dag_id}"},
+                        ]
+                    )
+                elif task.task_type in conf.getlist("core", "dag_dependencies_sensor"):
+                    nodes[task_node_id] = self._node_dict(
+                        task_node_id, task.task_id, "fill: rgb(230, 241, 242)"
+                    )
+
+                    edges.extend(
+                        [
+                            {"u": task_node_id, "v": dag_node_id},
+                            {"u": f"d:{task.external_dag_id}", "v": task_node_id},
+                        ]
+                    )
+
+            implicit = getattr(dag, "implicit_dependencies", None)

Review comment:
       correct, thanks. any other place now we could store sth like this?

##########
File path: airflow/www/views.py
##########
@@ -3811,3 +3811,102 @@ def autocomplete(self, session=None):
         payload = [row[0] for row in dag_ids_query.union(owners_query).limit(10).all()]
 
         return wwwutils.json_response(payload)
+
+
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = conf.getint(
+        "dag_dependencies_plugin",
+        "refresh_interval",
+        fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+    )
+    last_refresh = datetime.utcnow() - timedelta(seconds=refresh_interval)
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if datetime.utcnow() > self.last_refresh + timedelta(seconds=self.refresh_interval):
+            self._calculate_graph()
+            self.last_refresh = datetime.utcnow()
+
+        return self.render_template(
+            "airflow/dag_dependencies.html",
+            title=title,
+            nodes=self.nodes,
+            edges=self.edges,
+            last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+            arrange=conf.get("webserver", "dag_orientation"),
+            width=request.args.get("width", "100%"),
+            height=request.args.get("height", "800"),
+        )
+
+    def _calculate_graph(self):
+
+        current_app.dag_bag.collect_dags_from_db()
+
+        nodes = {}
+        edges = []
+
+        for dag_id, dag in current_app.dag_bag.dags.items():
+            dag_node_id = f"d:{dag_id}"
+            nodes[dag_node_id] = self._node_dict(dag_node_id, dag_id, "fill: rgb(232, 247, 228)")
+
+            for task in dag.tasks:
+                task_node_id = f"t:{dag_id}:{task.task_id}"
+                if task.task_type in conf.getlist("core", "dag_dependencies_trigger"):

Review comment:
       Good spot




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org