You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/22 17:35:42 UTC
[airflow] branch main updated: Add datasets to dag dependencies view (#25175)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 109b260920 Add datasets to dag dependencies view (#25175)
109b260920 is described below
commit 109b260920d816c5182445a2530a86d89593940f
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Jul 22 10:35:26 2022 -0700
Add datasets to dag dependencies view (#25175)
Also, remove config option `scheduler > dependency_detector`. See discussion in mailing list vote: https://lists.apache.org/thread/sd84gvxghocnjjgxz4p69qrb1t6k08z5
---
airflow/config_templates/config.yml | 6 --
airflow/config_templates/default_airflow.cfg | 3 -
airflow/serialization/serialized_objects.py | 102 ++++++++++++++++-----
airflow/www/static/css/graph.css | 9 ++
.../www/templates/airflow/dag_dependencies.html | 1 +
airflow/www/views.py | 24 ++---
6 files changed, 102 insertions(+), 43 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index b2c1b4e846..13c60cd86b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2121,12 +2121,6 @@
type: string
example: ~
default: "False"
- - name: dependency_detector
- description: DAG dependency detector class to use
- version_added: 2.1.0
- type: string
- example: ~
- default: "airflow.serialization.serialized_objects.DependencyDetector"
- name: trigger_timeout_check_interval
description: |
How often to check for expired trigger requests that have not run yet.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index b90f242d41..cd2913b9da 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1074,9 +1074,6 @@ use_job_schedule = True
# Only has effect if schedule_interval is set to None in DAG
allow_trigger_in_future = False
-# DAG dependency detector class to use
-dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
-
# How often to check for expired trigger requests that have not run yet.
trigger_timeout_check_interval = 15
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 201d27b1c8..e24aa826a4 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -20,6 +20,7 @@
import datetime
import enum
import logging
+import warnings
import weakref
from dataclasses import dataclass
from inspect import Parameter, signature
@@ -554,27 +555,61 @@ class BaseSerialization:
class DependencyDetector:
- """Detects dependencies between DAGs."""
+ """
+ Detects dependencies between DAGs.
+
+ :meta private:
+ """
@staticmethod
- def detect_task_dependencies(task: Operator) -> Optional['DagDependency']:
+ def detect_task_dependencies(task: Operator) -> List['DagDependency']:
"""Detects dependencies caused by tasks"""
+ deps = []
if isinstance(task, TriggerDagRunOperator):
- return DagDependency(
- source=task.dag_id,
- target=getattr(task, "trigger_dag_id"),
- dependency_type="trigger",
- dependency_id=task.task_id,
+ deps.append(
+ DagDependency(
+ source=task.dag_id,
+ target=getattr(task, "trigger_dag_id"),
+ dependency_type="trigger",
+ dependency_id=task.task_id,
+ )
)
elif isinstance(task, ExternalTaskSensor):
- return DagDependency(
- source=getattr(task, "external_dag_id"),
- target=task.dag_id,
- dependency_type="sensor",
- dependency_id=task.task_id,
+ deps.append(
+ DagDependency(
+ source=getattr(task, "external_dag_id"),
+ target=task.dag_id,
+ dependency_type="sensor",
+ dependency_id=task.task_id,
+ )
)
+ for obj in getattr(task, '_outlets', []):
+ if isinstance(obj, Dataset):
+ deps.append(
+ DagDependency(
+ source=task.dag_id,
+ target='dataset',
+ dependency_type='dataset',
+ dependency_id=obj.uri,
+ )
+ )
+ return deps
- return None
+ @staticmethod
+ def detect_dag_dependencies(dag: Optional[DAG]) -> List["DagDependency"]:
+ """Detects dependencies set directly on the DAG object."""
+ if dag and dag.schedule_on:
+ return [
+ DagDependency(
+ source="dataset",
+ target=dag.dag_id,
+ dependency_type="dataset",
+ dependency_id=x.uri,
+ )
+ for x in dag.schedule_on
+ ]
+ else:
+ return []
class SerializedBaseOperator(BaseOperator, BaseSerialization):
@@ -592,8 +627,6 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
if v.default is not v.empty
}
- dependency_detector = conf.getimport('scheduler', 'dependency_detector')
-
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# task_type is used by UI to display the correct class type, because UI only
@@ -837,9 +870,23 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
return op
@classmethod
- def detect_dependencies(cls, op: Operator) -> Optional['DagDependency']:
+ def detect_dependencies(cls, op: Operator) -> Set['DagDependency']:
"""Detects between DAG dependencies for the operator."""
- return cls.dependency_detector.detect_task_dependencies(op)
+ dependency_detector = DependencyDetector()
+ custom_dependency_detector = conf.getimport('scheduler', 'dependency_detector', fallback=None)
+ deps = set()
+ if not (custom_dependency_detector is None or type(dependency_detector) is DependencyDetector):
+ warnings.warn(
+ "Use of a custom dependency detector is deprecated. "
+ "Support will be removed in a future release.",
+ DeprecationWarning,
+ )
+ dep = custom_dependency_detector.detect_task_dependencies(op)
+ if type(dep) is DagDependency:
+ deps.add(dep)
+ deps.update(dependency_detector.detect_task_dependencies(op))
+ deps.update(dependency_detector.detect_dag_dependencies(op.dag))
+ return deps
@classmethod
def _is_excluded(cls, var: Any, attrname: str, op: "DAGNode"):
@@ -1010,11 +1057,14 @@ class SerializedDAG(DAG, BaseSerialization):
del serialized_dag["timetable"]
serialized_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
- serialized_dag["dag_dependencies"] = [
- vars(t)
- for t in (SerializedBaseOperator.detect_dependencies(task) for task in dag.task_dict.values())
+ dag_deps = [
+ t.__dict__
+ for task in dag.task_dict.values()
+ for t in SerializedBaseOperator.detect_dependencies(task)
if t is not None
]
+
+ serialized_dag["dag_dependencies"] = dag_deps
serialized_dag['_task_group'] = SerializedTaskGroup.serialize_task_group(dag.task_group)
# Edge info in the JSON exactly matches our internal structure
@@ -1208,12 +1258,20 @@ class DagDependency:
source: str
target: str
dependency_type: str
- dependency_id: str
+ dependency_id: Optional[str] = None
@property
def node_id(self):
"""Node ID for graph rendering"""
- return f"{self.dependency_type}:{self.source}:{self.target}:{self.dependency_id}"
+ val = f"{self.dependency_type}"
+ if not self.dependency_type == 'dataset':
+ val += f":{self.source}:{self.target}"
+ if self.dependency_id:
+ val += f":{self.dependency_id}"
+ return val
+
+ def __hash__(self):
+ return hash((self.source, self.target, self.dependency_type, self.dependency_id))
def _has_kubernetes() -> bool:
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index 5d30cb43e4..5dc33fe2f9 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -161,6 +161,11 @@ g.node text {
background-color: #e6f1f2;
}
+.legend-item.dataset {
+ float: left;
+ background-color: #fcecd4;
+}
+
g.node.dag rect {
fill: #e8f7e4;
}
@@ -172,3 +177,7 @@ g.node.trigger rect {
g.node.sensor rect {
fill: #e6f1f2;
}
+
+g.node.dataset rect {
+ fill: #fcecd4;
+}
diff --git a/airflow/www/templates/airflow/dag_dependencies.html b/airflow/www/templates/airflow/dag_dependencies.html
index c893c75bd0..3d25e1ce74 100644
--- a/airflow/www/templates/airflow/dag_dependencies.html
+++ b/airflow/www/templates/airflow/dag_dependencies.html
@@ -43,6 +43,7 @@ under the License.
<span class="legend-item dag">dag</span>
<span class="legend-item trigger">trigger</span>
<span class="legend-item sensor">sensor</span>
+ <span class="legend-item dataset">dataset</span>
</div>
<div style="float:right">Last refresh: <time datetime="{{ last_refresh }}">{{ last_refresh }}</time></div>
</div>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8681127f46..52540204b9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5320,24 +5320,24 @@ class DagDependenciesView(AirflowBaseView):
def _calculate_graph(self):
- nodes: List[Dict[str, Any]] = []
- edges: List[Dict[str, str]] = []
+ nodes_dict: Dict[str, Any] = {}
+ edge_tuples: Set[Dict[str, str]] = set()
for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
dag_node_id = f"dag:{dag}"
- nodes.append(self._node_dict(dag_node_id, dag, "dag"))
+ if dag_node_id not in nodes_dict:
+ nodes_dict[dag_node_id] = self._node_dict(dag_node_id, dag, "dag")
for dep in dependencies:
- nodes.append(self._node_dict(dep.node_id, dep.dependency_id, dep.dependency_type))
- edges.extend(
- [
- {"u": f"dag:{dep.source}", "v": dep.node_id},
- {"u": dep.node_id, "v": f"dag:{dep.target}"},
- ]
- )
+ if dep.node_id not in nodes_dict:
+ nodes_dict[dep.node_id] = self._node_dict(
+ dep.node_id, dep.dependency_id, dep.dependency_type
+ )
+ edge_tuples.add((f"dag:{dep.source}", dep.node_id))
+ edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
- self.nodes = nodes
- self.edges = edges
+ self.nodes = list(nodes_dict.values())
+ self.edges = [{"u": u, "v": v} for u, v in edge_tuples]
@staticmethod
def _node_dict(node_id, label, node_class):