You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/05/07 16:16:47 UTC
[airflow] branch master updated: Create cross-DAG dependencies view
(#13199)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new db0e942 Create cross-DAG dependencies view (#13199)
db0e942 is described below
commit db0e9425c6319d71c87554e0382b2f3852a7a10f
Author: Marcin SzymaĆski <ms...@gmail.com>
AuthorDate: Fri May 7 17:16:27 2021 +0100
Create cross-DAG dependencies view (#13199)
This adds a new view for displaying dependencies between DAGs. It's based on the
DAG Dependencies plugin (https://github.com/ms32035/airflow-dag-dependencies)
and has been updated to work with Airflow 2.0.
Since quite a bit of code is common with DAG graph view, that has been
externalized to a new module.
Unlike the external plugin this is uses the DAG serializater to store
dependencies at parse time, meaning it doesn't need to load all DAGs.
Co-authored-by: Brent Bovenzi <br...@gmail.com>
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
---
airflow/models/serialized_dag.py | 37 +++-
airflow/security/permissions.py | 1 +
airflow/serialization/schema.json | 9 +-
airflow/serialization/serialized_objects.py | 57 +++++-
airflow/www/extensions/init_views.py | 5 +
airflow/www/security.py | 1 +
airflow/www/static/css/graph.css | 27 +++
airflow/www/static/js/dag_dependencies.js | 195 +++++++++++++++++++++
.../www/templates/airflow/dag_dependencies.html | 75 ++++++++
airflow/www/views.py | 73 ++++++++
airflow/www/webpack.config.js | 1 +
docs/apache-airflow/concepts/dags.rst | 18 ++
setup.cfg | 1 +
tests/serialization/test_dag_serialization.py | 9 +-
tests/www/test_security.py | 1 +
tests/www/test_views.py | 8 +-
16 files changed, 512 insertions(+), 6 deletions(-)
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c2706f4..81448b6 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -27,12 +27,13 @@ import sqlalchemy_jsonfield
from sqlalchemy import BigInteger, Column, Index, String, and_
from sqlalchemy.orm import Session, backref, foreign, relationship
from sqlalchemy.sql import exists
+from sqlalchemy.sql.expression import func
from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DAG, DagModel
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import DagDependency, SerializedDAG
from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.utils import timezone
from airflow.utils.session import provide_session
@@ -274,6 +275,17 @@ class SerializedDagModel(Base):
@classmethod
@provide_session
+ def get_max_last_updated_datetime(cls, session: Session = None) -> datetime:
+ """
+ Get the maximum date when any DAG was last updated in serialized_dag table
+
+ :param session: ORM Session
+ :type session: Session
+ """
+ return session.query(func.max(cls.last_updated)).scalar()
+
+ @classmethod
+ @provide_session
def get_latest_version_hash(cls, dag_id: str, session: Session = None) -> str:
"""
Get the latest DAG version for a given DAG ID.
@@ -286,3 +298,26 @@ class SerializedDagModel(Base):
:rtype: str
"""
return session.query(cls.dag_hash).filter(cls.dag_id == dag_id).scalar()
+
+ @classmethod
+ @provide_session
+ def get_dag_dependencies(cls, session: Session = None) -> Dict[str, List['DagDependency']]:
+ """
+ Get the dependencies between DAGs
+
+ :param session: ORM Session
+ :type session: Session
+ """
+ dependencies = {}
+
+ if session.bind.dialect.name in ["sqlite", "mysql"]:
+ for row in session.query(cls.dag_id, func.json_extract(cls.data, "$.dag.dag_dependencies")).all():
+ dependencies[row[0]] = [DagDependency(**d) for d in json.loads(row[1])]
+
+ else:
+ for row in session.query(
+ cls.dag_id, func.json_extract_path(cls.data, "dag", "dag_dependencies")
+ ).all():
+ dependencies[row[0]] = [DagDependency(**d) for d in row[1]]
+
+ return dependencies
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index 115f25a..ae864a0 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -27,6 +27,7 @@ RESOURCE_DOCS_MENU = "Docs"
RESOURCE_DOCS = "Documentation"
RESOURCE_CONFIG = "Configurations"
RESOURCE_CONNECTION = "Connections"
+RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies"
RESOURCE_DAG_CODE = "DAG Code"
RESOURCE_DAG_RUN = "DAG Runs"
RESOURCE_IMPORT_ERROR = "ImportError"
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index fc79238..e87c4aa 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -66,6 +66,12 @@
"maxProperties": 1
}
},
+ "dag_dependencies": {
+ "type": "array",
+ "items": {
+ "type": "object"
+ }
+ },
"dag": {
"type": "object",
"properties": {
@@ -104,7 +110,8 @@
{ "type": "null" },
{ "$ref": "#/definitions/task_group" }
]},
- "edge_info": { "$ref": "#/definitions/edge_info" }
+ "edge_info": { "$ref": "#/definitions/edge_info" },
+ "dag_dependencies": { "$ref": "#/definitions/dag_dependencies" }
},
"required": [
"_dag_id",
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 645844a..9059728 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -19,6 +19,7 @@
import datetime
import enum
import logging
+from dataclasses import dataclass
from inspect import Parameter, signature
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union
@@ -302,8 +303,7 @@ class BaseSerialization:
elif type_ == DAT.SET:
return {cls._deserialize(v) for v in var}
elif type_ == DAT.TUPLE:
- # pylint: disable=consider-using-generator
- return tuple([cls._deserialize(v) for v in var])
+ return tuple(cls._deserialize(v) for v in var)
else:
raise TypeError(f'Invalid type {type_!s} in deserialization.')
@@ -344,6 +344,30 @@ class BaseSerialization:
return False
+class DependencyDetector:
+ """Detects dependencies between DAGs."""
+
+ @staticmethod
+ def detect_task_dependencies(task: BaseOperator) -> Optional['DagDependency']:
+ """Detects dependencies caused by tasks"""
+ if task.task_type == "TriggerDagRunOperator":
+ return DagDependency(
+ source=task.dag_id,
+ target=getattr(task, "trigger_dag_id"),
+ dependency_type="trigger",
+ dependency_id=task.task_id,
+ )
+ elif task.task_type == "ExternalTaskSensor":
+ return DagDependency(
+ source=getattr(task, "external_dag_id"),
+ target=task.dag_id,
+ dependency_type="sensor",
+ dependency_id=task.task_id,
+ )
+
+ return None
+
+
class SerializedBaseOperator(BaseOperator, BaseSerialization):
"""A JSON serializable representation of operator.
@@ -359,6 +383,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
if v.default is not v.empty
}
+ dependency_detector = DependencyDetector
+
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# task_type is used by UI to display the correct class type, because UI only
@@ -504,6 +530,11 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
return op
@classmethod
+ def detect_dependencies(cls, op: BaseOperator) -> Optional['DagDependency']:
+ """Detects between DAG dependencies for the operator."""
+ return cls.dependency_detector.detect_task_dependencies(op)
+
+ @classmethod
def _is_excluded(cls, var: Any, attrname: str, op: BaseOperator):
if var is not None and op.has_dag() and attrname.endswith("_date"):
# If this date is the same as the matching field in the dag, then
@@ -657,6 +688,11 @@ class SerializedDAG(DAG, BaseSerialization):
serialize_dag = cls.serialize_to_json(dag, cls._decorated_fields)
serialize_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
+ serialize_dag["dag_dependencies"] = [
+ vars(t)
+ for t in (SerializedBaseOperator.detect_dependencies(task) for task in dag.task_dict.values())
+ if t is not None
+ ]
serialize_dag['_task_group'] = SerializedTaskGroup.serialize_task_group(dag.task_group)
# Edge info in the JSON exactly matches our internal structure
@@ -821,3 +857,20 @@ class SerializedTaskGroup(TaskGroup, BaseSerialization):
group.upstream_task_ids = set(cls._deserialize(encoded_group["upstream_task_ids"]))
group.downstream_task_ids = set(cls._deserialize(encoded_group["downstream_task_ids"]))
return group
+
+
+@dataclass
+class DagDependency:
+ """Dataclass for representing dependencies between DAGs.
+ These are calculated during serialization and attached to serialized DAGs.
+ """
+
+ source: str
+ target: str
+ dependency_type: str
+ dependency_id: str
+
+ @property
+ def node_id(self):
+ """Node ID for graph rendering"""
+ return f"{self.dependency_type}:{self.source}:{self.target}:{self.dependency_id}"
diff --git a/airflow/www/extensions/init_views.py b/airflow/www/extensions/init_views.py
index 14e532b..b6bebfd 100644
--- a/airflow/www/extensions/init_views.py
+++ b/airflow/www/extensions/init_views.py
@@ -97,6 +97,11 @@ def init_appbuilder_views(app):
appbuilder.add_view(
views.XComModelView, permissions.RESOURCE_XCOM, category=permissions.RESOURCE_ADMIN_MENU
)
+ appbuilder.add_view(
+ views.DagDependenciesView,
+ permissions.RESOURCE_DAG_DEPENDENCIES,
+ category=permissions.RESOURCE_BROWSE_MENU,
+ )
# add_view_no_menu to change item position.
# I added link in extensions.init_appbuilder_links.init_appbuilder_links
appbuilder.add_view_no_menu(views.RedocView)
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 0cb7b40..8768587 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -69,6 +69,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable=
VIEWER_PERMISSIONS = [
(permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index 6259d45..f4c7b94 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -165,3 +165,30 @@ g.node.removed rect {
background-color: #f0f0f0;
cursor: move;
}
+
+.legend-item.dag {
+ float: left;
+ background-color: #e8f7e4;
+}
+
+.legend-item.trigger {
+ float: left;
+ background-color: #ffefeb;
+}
+
+.legend-item.sensor {
+ float: left;
+ background-color: #e6f1f2;
+}
+
+g.node.dag rect {
+ fill: #e8f7e4;
+}
+
+g.node.trigger rect {
+ fill: #ffefeb;
+}
+
+g.node.sensor rect {
+ fill: #e6f1f2;
+}
diff --git a/airflow/www/static/js/dag_dependencies.js b/airflow/www/static/js/dag_dependencies.js
new file mode 100644
index 0000000..786d337
--- /dev/null
+++ b/airflow/www/static/js/dag_dependencies.js
@@ -0,0 +1,195 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ global d3, localStorage, dagreD3, nodes, edges, arrange, document,
+*/
+
+const highlightColor = '#000000';
+const upstreamColor = '#2020A0';
+const downstreamColor = '#0000FF';
+const initialStrokeWidth = '3px';
+const highlightStrokeWidth = '5px';
+const duration = 500;
+
+// Preparation of DagreD3 data structures
+const g = new dagreD3.graphlib.Graph()
+ .setGraph({
+ nodesep: 15,
+ ranksep: 15,
+ rankdir: arrange,
+ })
+ .setDefaultEdgeLabel(() => ({ lineInterpolate: 'basis' }));
+
+// Set all nodes and styles
+nodes.forEach((node) => {
+ g.setNode(node.id, node.value);
+});
+
+// Set edges
+edges.forEach((edge) => {
+ g.setEdge(edge.u, edge.v);
+});
+
+const render = dagreD3.render();
+const svg = d3.select('#graph-svg');
+const innerSvg = d3.select('#graph-svg g');
+
+innerSvg.call(render, g);
+
+// Returns true if a node's id or its children's id matches search_text
+function nodeMatches(nodeId, searchText) {
+ if (nodeId.indexOf(searchText) > -1) return true;
+ return false;
+}
+
+function highlightNodes(nodes, color, strokeWidth) {
+ nodes.forEach((nodeid) => {
+ const myNode = g.node(nodeid).elem;
+ d3.select(myNode)
+ .selectAll('rect,circle')
+ .style('stroke', color)
+ .style('stroke-width', strokeWidth);
+ });
+}
+
+let zoom = null;
+
+function setUpZoomSupport() {
+ // Set up zoom support for Graph
+ zoom = d3.behavior.zoom().on('zoom', () => {
+ innerSvg.attr('transform', `translate(${d3.event.translate})scale(${d3.event.scale})`);
+ });
+ svg.call(zoom);
+
+ // Centering the DAG on load
+ // Get Dagre Graph dimensions
+ const graphWidth = g.graph().width;
+ const graphHeight = g.graph().height;
+ // Get SVG dimensions
+ const padding = 20;
+ const svgBb = svg.node().getBoundingClientRect();
+ const width = svgBb.width - padding * 2;
+ const height = svgBb.height - padding; // we are not centering the dag vertically
+
+ // Calculate applicable scale for zoom
+ const zoomScale = Math.min(
+ Math.min(width / graphWidth, height / graphHeight),
+ 1.5, // cap zoom level to 1.5 so nodes are not too large
+ );
+
+ zoom.translate([(width / 2) - ((graphWidth * zoomScale) / 2) + padding, padding]);
+ zoom.scale(zoomScale);
+ zoom.event(innerSvg);
+}
+
+function setUpNodeHighlighting(focusItem = null) {
+ d3.selectAll('g.node').on('mouseover', function (d) {
+ d3.select(this).selectAll('rect').style('stroke', highlightColor);
+ highlightNodes(g.predecessors(d), upstreamColor, highlightStrokeWidth);
+ highlightNodes(g.successors(d), downstreamColor, highlightStrokeWidth);
+ const adjacentNodeNames = [d, ...g.predecessors(d), ...g.successors(d)];
+ d3.selectAll('g.nodes g.node')
+ .filter((x) => !adjacentNodeNames.includes(x))
+ .style('opacity', 0.2);
+ const adjacentEdges = g.nodeEdges(d);
+ d3.selectAll('g.edgePath')[0]
+ // eslint-disable-next-line no-underscore-dangle
+ .filter((x) => !adjacentEdges.includes(x.__data__))
+ .forEach((x) => {
+ d3.select(x).style('opacity', 0.2);
+ });
+ });
+
+ d3.selectAll('g.node').on('mouseout', function (d) {
+ d3.select(this).selectAll('rect,circle').style('stroke', null);
+ highlightNodes(g.predecessors(d), null, initialStrokeWidth);
+ highlightNodes(g.successors(d), null, initialStrokeWidth);
+ d3.selectAll('g.node')
+ .style('opacity', 1);
+ d3.selectAll('g.node rect')
+ .style('stroke-width', initialStrokeWidth);
+ d3.selectAll('g.edgePath')
+ .style('opacity', 1);
+ if (focusItem) {
+ localStorage.removeItem(focusItem);
+ }
+ });
+}
+
+function searchboxHighlighting(s) {
+ let match = null;
+
+ d3.selectAll('g.nodes g.node').forEach(function forEach(d) {
+ if (s === '') {
+ d3.select('g.edgePaths')
+ .transition().duration(duration)
+ .style('opacity', 1);
+ d3.select(this)
+ .transition().duration(duration)
+ .style('opacity', 1)
+ .selectAll('rect')
+ .style('stroke-width', initialStrokeWidth);
+ } else {
+ d3.select('g.edgePaths')
+ .transition().duration(duration)
+ .style('opacity', 0.2);
+ if (nodeMatches(d, s)) {
+ if (!match) match = this;
+ d3.select(this)
+ .transition().duration(duration)
+ .style('opacity', 1)
+ .selectAll('rect')
+ .style('stroke-width', highlightStrokeWidth);
+ } else {
+ d3.select(this)
+ .transition()
+ .style('opacity', 0.2).duration(duration)
+ .selectAll('rect')
+ .style('stroke-width', initialStrokeWidth);
+ }
+ }
+ });
+
+ // This moves the matched node to the center of the graph area
+ if (match) {
+ const transform = d3.transform(d3.select(match).attr('transform'));
+
+ const svgBb = svg.node().getBoundingClientRect();
+ transform.translate = [
+ svgBb.width / 2 - transform.translate[0],
+ svgBb.height / 2 - transform.translate[1],
+ ];
+ transform.scale = [1, 1];
+
+ if (zoom !== null) {
+ zoom.translate(transform.translate);
+ zoom.scale(1);
+ zoom.event(innerSvg);
+ }
+ }
+}
+
+setUpNodeHighlighting();
+setUpZoomSupport();
+
+d3.select('#searchbox').on('keyup', () => {
+ const s = document.getElementById('searchbox').value;
+ searchboxHighlighting(s);
+});
diff --git a/airflow/www/templates/airflow/dag_dependencies.html b/airflow/www/templates/airflow/dag_dependencies.html
new file mode 100644
index 0000000..708b7fc
--- /dev/null
+++ b/airflow/www/templates/airflow/dag_dependencies.html
@@ -0,0 +1,75 @@
+{#
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+#}
+
+{% extends "airflow/main.html" %}
+
+{% block title %}Airflow - DAG Dependencies{% endblock %}
+
+{% block head_css %}
+{{ super() }}
+<link rel="stylesheet" type="text/css" href="{{ url_for_asset('graph.css') }}">
+{% endblock %}
+
+{% block content %}
+{{ super() }}
+<h2>
+ {{ title }}
+ <div class="input-group" style="float: right">
+ <input type="text" id="searchbox" class="form-control" placeholder="Search for..." onenter="null">
+ </div>
+</h2>
+<hr/>
+<div class="legend-row">
+ <div>
+ <span class="legend-item dag">dag</span>
+ <span class="legend-item trigger">trigger</span>
+ <span class="legend-item sensor">sensor</span>
+ </div>
+ <div style="float:right">Last refresh: {{ last_refresh }}</div>
+</div>
+<div id="error" style="display: none; margin-top: 10px;" class="alert alert-danger" role="alert">
+ <span class="glyphicon glyphicon-exclamation-sign" aria-hidden="true"></span>
+ <span id="error_msg">Oops.</span>
+</div>
+<br/>
+<div class="svg-wrapper">
+ <div class="graph-svg-wrap">
+ <svg id="graph-svg" width="{{ width }}" height="{{ height }}">
+ <g id="dig" transform="translate(20,20)"></g>
+ <filter id="blur-effect-1">
+ <feGaussianBlur stdDeviation="3"></feGaussianBlur>
+ </filter>
+ </svg>
+ </div>
+</div>
+{% endblock %}
+
+{% block tail %}
+ {{ super() }}
+ <script src="{{ url_for_asset('d3.min.js') }}"></script>
+ <script src="{{ url_for_asset('dagre-d3.min.js') }}"></script>
+ <script src="{{ url_for_asset('d3-shape.min.js') }}"></script>
+ <script src="{{ url_for_asset('d3-tip.js') }}"></script>
+ <script>
+ let nodes = {{ nodes|tojson }};
+ let edges = {{ edges|tojson }};
+ const arrange = '{{ arrange }}';
+ </script>
+ <script src="{{ url_for_asset('dagDependencies.js') }}"></script>
+{% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6e38cd6..24a4506 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -97,6 +97,7 @@ from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, Task
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
@@ -3908,6 +3909,78 @@ class DagModelView(AirflowModelView):
return wwwutils.json_response(payload)
+class DagDependenciesView(AirflowBaseView):
+ """View to show dependencies between DAGs"""
+
+ refresh_interval = timedelta(
+ seconds=conf.getint(
+ "webserver",
+ "dag_dependencies_refresh_interval",
+ fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+ )
+ )
+ last_refresh = timezone.utcnow() - refresh_interval
+ nodes = []
+ edges = []
+
+ @expose('/dag-dependencies')
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
+ ]
+ )
+ @gzipped
+ @action_logging
+ def list(self):
+ """Display DAG dependencies"""
+ title = "DAG Dependencies"
+
+ if timezone.utcnow() > self.last_refresh + self.refresh_interval:
+ if SerializedDagModel.get_max_last_updated_datetime() > self.last_refresh:
+ self._calculate_graph()
+ self.last_refresh = timezone.utcnow()
+
+ return self.render_template(
+ "airflow/dag_dependencies.html",
+ title=title,
+ nodes=self.nodes,
+ edges=self.edges,
+ last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+ arrange=conf.get("webserver", "dag_orientation"),
+ width=request.args.get("width", "100%"),
+ height=request.args.get("height", "800"),
+ )
+
+ def _calculate_graph(self):
+
+ nodes = []
+ edges = []
+
+ for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
+ dag_node_id = f"dag:{dag}"
+ nodes.append(self._node_dict(dag_node_id, dag, "dag"))
+
+ for dep in dependencies:
+
+ nodes.append(self._node_dict(dep.node_id, dep.dependency_id, dep.dependency_type))
+ edges.extend(
+ [
+ {"u": f"dag:{dep.source}", "v": dep.node_id},
+ {"u": dep.node_id, "v": f"dag:{dep.target}"},
+ ]
+ )
+
+ self.nodes = nodes
+ self.edges = edges
+
+ @staticmethod
+ def _node_dict(node_id, label, node_class):
+ return {
+ "id": node_id,
+ "value": {"label": label, "rx": 5, "ry": 5, "class": node_class},
+ }
+
+
class CustomPermissionModelView(PermissionModelView):
"""Customize permission names for FAB's builtin PermissionModelView."""
diff --git a/airflow/www/webpack.config.js b/airflow/www/webpack.config.js
index fb7e63c..e5683b6 100644
--- a/airflow/www/webpack.config.js
+++ b/airflow/www/webpack.config.js
@@ -40,6 +40,7 @@ const config = {
airflowDefaultTheme: `${CSS_DIR}/bootstrap-theme.css`,
connectionForm: `${JS_DIR}/connection_form.js`,
dagCode: `${JS_DIR}/dag_code.js`,
+ dagDependencies: `${JS_DIR}/dag_dependencies.js`,
dags: [`${CSS_DIR}/dags.css`, `${JS_DIR}/dags.js`],
flash: `${CSS_DIR}/flash.css`,
gantt: [`${CSS_DIR}/gantt.css`, `${JS_DIR}/gantt.js`],
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index b82b55f..46849ab 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -585,3 +585,21 @@ Note that packaged DAGs come with some caveats:
* They will be inserted into Python's ``sys.path`` and importable by any other code in the Airflow process, so ensure the package names don't clash with other packages already installed on your system.
In general, if you have a complex set of compiled dependencies and modules, you are likely better off using the Python ``virtualenv`` system and installing the necessary packages on your target systems with ``pip``.
+
+DAG Dependencies
+================
+
+*Added in Airflow 2.1*
+
+While dependencies between tasks in a DAG are explicitly defined through upstream and downstream
+relationships, dependencies between DAGs are a bit more complex. In general, there are two ways
+in which one DAG can depend on another:
+
+- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator`
+- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`
+
+Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
+with different execution dates. The **Dag Dependencies** view
+``Menu -> Browse -> DAG Dependencies`` helps visualize dependencies between DAGs. The dependencies
+are calculated by the scheduler during DAG serialization and the webserver uses them to build
+the dependency graph.
diff --git a/setup.cfg b/setup.cfg
index bad32b3..bc2e5d6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -92,6 +92,7 @@ install_requires =
connexion[swagger-ui,flask]>=2.6.0,<3
croniter>=0.3.17, <0.4
cryptography>=0.9.3
+ dataclasses;python_version<"3.7"
dill>=0.2.2, <0.4
flask>=1.1.0, <2.0
flask-appbuilder~=3.1,>=3.1.1
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 6a186c5..c1ce92e 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -145,6 +145,7 @@ serialized_simple_dag_ground_truth = {
},
},
"edge_info": {},
+ "dag_dependencies": [],
},
}
@@ -817,7 +818,13 @@ class TestStringifiedDAGs(unittest.TestCase):
dag_schema: dict = load_dag_schema_dict()["definitions"]["dag"]["properties"]
# The parameters we add manually in Serialization needs to be ignored
- ignored_keys: set = {"is_subdag", "tasks", "has_on_success_callback", "has_on_failure_callback"}
+ ignored_keys: set = {
+ "is_subdag",
+ "tasks",
+ "has_on_success_callback",
+ "has_on_failure_callback",
+ "dag_dependencies",
+ }
dag_params: set = set(dag_schema.keys()) - ignored_keys
assert set(DAG.get_serialized_fields()) == dag_params
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 6dabe8c..73fe4b1 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -283,6 +283,7 @@ class TestSecurity(unittest.TestCase):
viewer_role_perms = {
(permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 88bceff..e670063 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -280,7 +280,7 @@ class TestAirflowBaseViews(TestBase):
)
def test_index(self):
- with assert_queries_count(43):
+ with assert_queries_count(44):
resp = self.client.get('/', follow_redirects=True)
self.check_content_in_response('DAGs', resp)
@@ -827,6 +827,12 @@ class TestAirflowBaseViews(TestBase):
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('runme_1', resp)
+ def test_dag_dependencies(self):
+ url = 'dag-dependencies'
+ resp = self.client.get(url, follow_redirects=True)
+ self.check_content_in_response('child_task1', resp)
+ self.check_content_in_response('test_trigger_dagrun', resp)
+
def test_last_dagruns(self):
resp = self.client.post('last_dagruns', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)