You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/10/13 21:00:26 UTC
[airflow] branch main updated: Betterize dataset update blurb (#26878)
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3bcc0b72a5 Betterize dataset update blurb (#26878)
3bcc0b72a5 is described below
commit 3bcc0b72a5cec8520d15d26e0350694c22153c88
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Thu Oct 13 14:00:16 2022 -0700
Betterize dataset update blurb (#26878)
* Move a user-facing string to a template for better model-view separation
* fixup
Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
* Improve dataset summary in dags list view
* Update dag.html dataset tag & link directly to dataset details
Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
Co-authored-by: Brent Bovenzi <br...@gmail.com>
---
airflow/models/dag.py | 20 +++++++++++++----
airflow/www/static/js/dag.js | 14 ++++++++++--
airflow/www/static/js/dags.js | 18 +++++++++++++--
airflow/www/templates/airflow/dag.html | 40 +++++++++++++++++++++------------
airflow/www/templates/airflow/dags.html | 39 ++++++++++++++++++++------------
tests/models/test_dag.py | 14 +++++++++++-
tests/utils/test_db_cleanup.py | 4 +++-
7 files changed, 111 insertions(+), 38 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7adfdc4589..e5be0b2ead 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -77,7 +77,7 @@ from airflow.models.base import Base, StringID
from airflow.models.dagcode import DagCode
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
-from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ
+from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel
from airflow.models.operator import Operator
from airflow.models.param import DagParam, ParamsDict
from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances
@@ -200,18 +200,24 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
return query.first()
-def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, dict[str, int]]:
+def get_dataset_triggered_next_run_info(
+ dag_ids: list[str], *, session: Session
+) -> dict[str, dict[str, int | str]]:
"""
Given a list of dag_ids, get string representing how close any that are dataset triggered are
their next run, e.g. "1 of 2 datasets updated"
"""
return {
x.dag_id: {
+ "uri": x.uri,
"ready": x.ready,
"total": x.total,
}
for x in session.query(
DagScheduleDatasetReference.dag_id,
+ # This is a dirty hack to workaround group by requiring an aggregate, since grouping by dataset
+ # is not what we want to do here...but it works
+ case((func.count() == 1, func.max(DatasetModel.uri)), else_='').label("uri"),
func.count().label("total"),
func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)).label("ready"),
)
@@ -223,7 +229,13 @@ def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session)
),
isouter=True,
)
- .group_by(DagScheduleDatasetReference.dag_id)
+ .join(
+ DatasetModel,
+ DatasetModel.id == DagScheduleDatasetReference.dataset_id,
+ )
+ .group_by(
+ DagScheduleDatasetReference.dag_id,
+ )
.filter(DagScheduleDatasetReference.dag_id.in_(dag_ids))
.all()
}
@@ -3419,7 +3431,7 @@ class DagModel(Base):
)
@provide_session
- def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int] | None:
+ def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None:
if self.schedule_interval != "Dataset":
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index 8bf69aa67f..f2c3ce39ee 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -39,6 +39,7 @@ const logsWithMetadataUrl = getMetaValue('logs_with_metadata_url');
const externalLogUrl = getMetaValue('external_log_url');
const extraLinksUrl = getMetaValue('extra_links_url');
const pausedUrl = getMetaValue('paused_url');
+const datasetsUrl = getMetaValue('datasets_url');
const nextRun = {
createAfter: getMetaValue('next_dagrun_create_after'),
intervalStart: getMetaValue('next_dagrun_data_interval_start'),
@@ -65,7 +66,10 @@ $(window).on('load', function onLoad() {
$(`a[href*="${this.location.pathname}"]`).parent().addClass('active');
$('.never_active').removeClass('active');
const run = $('#next-dataset-tooltip');
- getDatasetTooltipInfo(dagId, run, setNextDatasets);
+ const singleDatasetUri = $(run).data('uri');
+ if (!singleDatasetUri) {
+ getDatasetTooltipInfo(dagId, run, setNextDatasets);
+ }
});
const buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce((obj, elm) => {
@@ -411,6 +415,12 @@ $('#next-run').on('mouseover', () => {
});
$('.next-dataset-triggered').on('click', (e) => {
+ const run = $('#next-dataset-tooltip');
const summary = $(e.target).data('summary');
- openDatasetModal(dagId, summary, nextDatasets, nextDatasetsError);
+ const singleDatasetUri = $(run).data('uri');
+ if (!singleDatasetUri) {
+ openDatasetModal(dagId, summary, nextDatasets, nextDatasetsError);
+ } else {
+ window.location.href = `${datasetsUrl}?uri=${encodeURIComponent(singleDatasetUri)}`;
+ }
});
diff --git a/airflow/www/static/js/dags.js b/airflow/www/static/js/dags.js
index db8f19b203..3acd8a4742 100644
--- a/airflow/www/static/js/dags.js
+++ b/airflow/www/static/js/dags.js
@@ -39,6 +39,7 @@ const lastDagRunsUrl = getMetaValue('last_dag_runs_url');
const dagStatsUrl = getMetaValue('dag_stats_url');
const taskStatsUrl = getMetaValue('task_stats_url');
const gridUrl = getMetaValue('grid_url');
+const datasetsUrl = getMetaValue('datasets_url');
const nextDatasets = {};
let nextDatasetsError;
@@ -545,17 +546,30 @@ $('#auto_refresh').change(() => {
$('.next-dataset-triggered').on('click', (e) => {
const dagId = $(e.target).data('dag-id');
const summary = $(e.target).data('summary');
- if (dagId) openDatasetModal(dagId, summary, nextDatasets[dagId], nextDatasetsError);
+ const singleDatasetUri = $(e.target).data('uri');
+
+ // If there are multiple datasets, open a modal, otherwise link directly to the dataset
+ if (!singleDatasetUri) {
+ if (dagId) openDatasetModal(dagId, summary, nextDatasets[dagId], nextDatasetsError);
+ } else {
+ window.location.href = `${datasetsUrl}?uri=${encodeURIComponent(singleDatasetUri)}`;
+ }
});
$('.js-dataset-triggered').each((i, cell) => {
$(cell).on('mouseover', () => {
const run = $(cell).children();
const dagId = $(run).data('dag-id');
+ const singleDatasetUri = $(run).data('uri');
+
const setNextDatasets = (datasets, error) => {
nextDatasets[dagId] = datasets;
nextDatasetsError = error;
};
- getDatasetTooltipInfo(dagId, run, setNextDatasets);
+
+ // Only update the tooltip info if there are multiple datasets
+ if (!singleDatasetUri) {
+ getDatasetTooltipInfo(dagId, run, setNextDatasets);
+ }
});
});
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 45260a627e..f7eedb9e8f 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -137,21 +137,33 @@
</p>
{% endif %}
{% if dag_model is defined and dag_model.schedule_interval is defined and dag_model.schedule_interval == 'Dataset' %}
- <span
- id="next-dataset-tooltip"
- class="js-tooltip"
- title="Click to see dataset details."
- data-html="true"
- data-placement="bottom"
- >
- <p
- class="label label-default next-dataset-triggered"
- style="margin-left: 5px;"
- data-summary="{{ dag_model.get_dataset_triggered_next_run_info() }}"
+ {%- with ds_info = dag_model.get_dataset_triggered_next_run_info() -%}
+ <span
+ id="next-dataset-tooltip"
+ class="js-tooltip"
+ title="Click to see dataset details."
+ data-html="true"
+ data-placement="bottom"
+ data-uri="{{ ds_info.uri }}"
>
- Next Run: {{ dag_model.get_dataset_triggered_next_run_info() }}
- </p>
- </span>
+ <p
+ class="label label-default next-dataset-triggered"
+ style="margin-left: 5px;"
+ data-summary="
+ {%- if ds_info.total == 1 -%}
+ On {{ ds_info.uri }}
+ {%- else -%}
+ {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+ {%- endif -%}"
+ >
+ {% if ds_info.total == 1 -%}
+ On {{ ds_info.uri }}
+ {%- else -%}
+ {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+ {%- endif %}
+ </p>
+ </span>
+ {%- endwith -%}
{% endif %}
</h4>
</div>
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index c675b4a05a..4639c17ace 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -297,22 +297,33 @@
</td>
<td class="text-nowrap {{ 'js-dataset-triggered' if dag.dag_id in dataset_triggered_next_run_info }}">
{% if dag.dag_id in dataset_triggered_next_run_info %}
- <span
- data-dag-id="{{ dag.dag_id }}"
- class="js-tooltip js-next-dataset-run-tooltip"
- title="Click to see dataset details."
- data-html="true"
- >
- <div
- class="label label-default next-dataset-triggered"
+ {%- with ds_info = dataset_triggered_next_run_info[dag.dag_id] -%}
+ <span
data-dag-id="{{ dag.dag_id }}"
- data-summary="{{ dataset_triggered_next_run_info[dag.dag_id] }}"
+ class="js-tooltip js-next-dataset-run-tooltip"
+ title="Click to see dataset details."
+ data-html="true"
+ data-uri="{{ ds_info.uri }}"
>
- {% with ds_info = dataset_triggered_next_run_info[dag.dag_id] %}
- {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
- {% endwith %}
- </div>
- </span>
+ <div
+ class="label label-default next-dataset-triggered"
+ data-dag-id="{{ dag.dag_id }}"
+ data-uri="{{ ds_info.uri }}"
+ data-summary="
+ {%- if ds_info.total == 1 -%}
+ On {{ ds_info.uri }}
+ {%- else -%}
+ {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+ {%- endif -%}"
+ >
+ {% if ds_info.total == 1 -%}
+ On {{ ds_info.uri[0:40] + '…' if ds_info.uri and ds_info.uri|length > 40 else ds_info.uri|default('', true) }}
+ {%- else -%}
+ {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+ {%- endif %}
+ </div>
+ </span>
+ {%- endwith -%}
{% endif %}
{% if dag.next_dagrun is not none %}
<time datetime="{{ dag.next_dagrun }}">{{ dag.next_dagrun }}</time>
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 379fb05937..0e28868307 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -86,6 +86,13 @@ def clear_dags():
clear_db_serialized_dags()
+@pytest.fixture
+def clear_datasets():
+ clear_db_datasets()
+ yield
+ clear_db_datasets()
+
+
class TestDag:
def setup_method(self) -> None:
clear_db_runs()
@@ -3005,7 +3012,7 @@ def test__tags_length(tags: list[str], should_pass: bool):
@pytest.mark.need_serialized_dag
-def test_get_dataset_triggered_next_run_info(dag_maker):
+def test_get_dataset_triggered_next_run_info(dag_maker, clear_datasets):
dataset1 = Dataset(uri="ds1")
dataset2 = Dataset(uri="ds2")
dataset3 = Dataset(uri="ds3")
@@ -3031,10 +3038,13 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
)
session.flush()
+ datasets = session.query(DatasetModel.uri).order_by(DatasetModel.id).all()
+
info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
assert info[dag1.dag_id] == {
"ready": 0,
"total": 1,
+ "uri": datasets[0].uri,
}
# This time, check both dag2 and dag3 at the same time (tests filtering)
@@ -3042,10 +3052,12 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
assert info[dag2.dag_id] == {
"ready": 1,
"total": 2,
+ "uri": "",
}
assert info[dag3.dag_id] == {
"ready": 1,
"total": 3,
+ "uri": "",
}
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index d4c9c8603a..66ce41eff1 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -32,16 +32,18 @@ from airflow.models import DagModel, DagRun, TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.db_cleanup import _build_query, _cleanup_table, config_dict, run_cleanup
from airflow.utils.session import create_session
-from tests.test_utils.db import clear_db_dags, clear_db_runs, drop_tables_with_prefix
+from tests.test_utils.db import clear_db_dags, clear_db_datasets, clear_db_runs, drop_tables_with_prefix
@pytest.fixture(autouse=True)
def clean_database():
"""Fixture that cleans the database before and after every test."""
clear_db_runs()
+ clear_db_datasets()
clear_db_dags()
yield # Test runs here
clear_db_dags()
+ clear_db_datasets()
clear_db_runs()