You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/10/13 21:00:26 UTC

[airflow] branch main updated: Betterize dataset update blurb (#26878)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bcc0b72a5 Betterize dataset update blurb (#26878)
3bcc0b72a5 is described below

commit 3bcc0b72a5cec8520d15d26e0350694c22153c88
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Thu Oct 13 14:00:16 2022 -0700

    Betterize dataset update blurb (#26878)
    
    * Move a user-facing string to a template for better model-view separation
    
    * fixup
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    
    * Improve dataset summary in dags list view
    
    * Update dag.html dataset tag & link directly to dataset details
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
---
 airflow/models/dag.py                   | 20 +++++++++++++----
 airflow/www/static/js/dag.js            | 14 ++++++++++--
 airflow/www/static/js/dags.js           | 18 +++++++++++++--
 airflow/www/templates/airflow/dag.html  | 40 +++++++++++++++++++++------------
 airflow/www/templates/airflow/dags.html | 39 ++++++++++++++++++++------------
 tests/models/test_dag.py                | 14 +++++++++++-
 tests/utils/test_db_cleanup.py          |  4 +++-
 7 files changed, 111 insertions(+), 38 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7adfdc4589..e5be0b2ead 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -77,7 +77,7 @@ from airflow.models.base import Base, StringID
 from airflow.models.dagcode import DagCode
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import DagRun
-from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ
+from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel
 from airflow.models.operator import Operator
 from airflow.models.param import DagParam, ParamsDict
 from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances
@@ -200,18 +200,24 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
-def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, dict[str, int]]:
+def get_dataset_triggered_next_run_info(
+    dag_ids: list[str], *, session: Session
+) -> dict[str, dict[str, int | str]]:
     """
     Given a list of dag_ids, get string representing how close any that are dataset triggered are
     their next run, e.g. "1 of 2 datasets updated"
     """
     return {
         x.dag_id: {
+            "uri": x.uri,
             "ready": x.ready,
             "total": x.total,
         }
         for x in session.query(
             DagScheduleDatasetReference.dag_id,
+            # This is a dirty hack to workaround group by requiring an aggregate, since grouping by dataset
+            # is not what we want to do here...but it works
+            case((func.count() == 1, func.max(DatasetModel.uri)), else_='').label("uri"),
             func.count().label("total"),
             func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)).label("ready"),
         )
@@ -223,7 +229,13 @@ def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session)
             ),
             isouter=True,
         )
-        .group_by(DagScheduleDatasetReference.dag_id)
+        .join(
+            DatasetModel,
+            DatasetModel.id == DagScheduleDatasetReference.dataset_id,
+        )
+        .group_by(
+            DagScheduleDatasetReference.dag_id,
+        )
         .filter(DagScheduleDatasetReference.dag_id.in_(dag_ids))
         .all()
     }
@@ -3419,7 +3431,7 @@ class DagModel(Base):
         )
 
     @provide_session
-    def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int] | None:
+    def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None:
         if self.schedule_interval != "Dataset":
             return None
         return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index 8bf69aa67f..f2c3ce39ee 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -39,6 +39,7 @@ const logsWithMetadataUrl = getMetaValue('logs_with_metadata_url');
 const externalLogUrl = getMetaValue('external_log_url');
 const extraLinksUrl = getMetaValue('extra_links_url');
 const pausedUrl = getMetaValue('paused_url');
+const datasetsUrl = getMetaValue('datasets_url');
 const nextRun = {
   createAfter: getMetaValue('next_dagrun_create_after'),
   intervalStart: getMetaValue('next_dagrun_data_interval_start'),
@@ -65,7 +66,10 @@ $(window).on('load', function onLoad() {
   $(`a[href*="${this.location.pathname}"]`).parent().addClass('active');
   $('.never_active').removeClass('active');
   const run = $('#next-dataset-tooltip');
-  getDatasetTooltipInfo(dagId, run, setNextDatasets);
+  const singleDatasetUri = $(run).data('uri');
+  if (!singleDatasetUri) {
+    getDatasetTooltipInfo(dagId, run, setNextDatasets);
+  }
 });
 
 const buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce((obj, elm) => {
@@ -411,6 +415,12 @@ $('#next-run').on('mouseover', () => {
 });
 
 $('.next-dataset-triggered').on('click', (e) => {
+  const run = $('#next-dataset-tooltip');
   const summary = $(e.target).data('summary');
-  openDatasetModal(dagId, summary, nextDatasets, nextDatasetsError);
+  const singleDatasetUri = $(run).data('uri');
+  if (!singleDatasetUri) {
+    openDatasetModal(dagId, summary, nextDatasets, nextDatasetsError);
+  } else {
+    window.location.href = `${datasetsUrl}?uri=${encodeURIComponent(singleDatasetUri)}`;
+  }
 });
diff --git a/airflow/www/static/js/dags.js b/airflow/www/static/js/dags.js
index db8f19b203..3acd8a4742 100644
--- a/airflow/www/static/js/dags.js
+++ b/airflow/www/static/js/dags.js
@@ -39,6 +39,7 @@ const lastDagRunsUrl = getMetaValue('last_dag_runs_url');
 const dagStatsUrl = getMetaValue('dag_stats_url');
 const taskStatsUrl = getMetaValue('task_stats_url');
 const gridUrl = getMetaValue('grid_url');
+const datasetsUrl = getMetaValue('datasets_url');
 
 const nextDatasets = {};
 let nextDatasetsError;
@@ -545,17 +546,30 @@ $('#auto_refresh').change(() => {
 $('.next-dataset-triggered').on('click', (e) => {
   const dagId = $(e.target).data('dag-id');
   const summary = $(e.target).data('summary');
-  if (dagId) openDatasetModal(dagId, summary, nextDatasets[dagId], nextDatasetsError);
+  const singleDatasetUri = $(e.target).data('uri');
+
+  // If there are multiple datasets, open a modal, otherwise link directly to the dataset
+  if (!singleDatasetUri) {
+    if (dagId) openDatasetModal(dagId, summary, nextDatasets[dagId], nextDatasetsError);
+  } else {
+    window.location.href = `${datasetsUrl}?uri=${encodeURIComponent(singleDatasetUri)}`;
+  }
 });
 
 $('.js-dataset-triggered').each((i, cell) => {
   $(cell).on('mouseover', () => {
     const run = $(cell).children();
     const dagId = $(run).data('dag-id');
+    const singleDatasetUri = $(run).data('uri');
+
     const setNextDatasets = (datasets, error) => {
       nextDatasets[dagId] = datasets;
       nextDatasetsError = error;
     };
-    getDatasetTooltipInfo(dagId, run, setNextDatasets);
+
+    // Only update the tooltip info if there are multiple datasets
+    if (!singleDatasetUri) {
+      getDatasetTooltipInfo(dagId, run, setNextDatasets);
+    }
   });
 });
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 45260a627e..f7eedb9e8f 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -137,21 +137,33 @@
         </p>
       {% endif %}
       {% if dag_model is defined and dag_model.schedule_interval is defined and dag_model.schedule_interval == 'Dataset' %}
-        <span
-          id="next-dataset-tooltip"
-          class="js-tooltip"
-          title="Click to see dataset details."
-          data-html="true"
-          data-placement="bottom"
-        >
-          <p
-            class="label label-default next-dataset-triggered"
-            style="margin-left: 5px;"
-            data-summary="{{ dag_model.get_dataset_triggered_next_run_info() }}"
+        {%- with ds_info = dag_model.get_dataset_triggered_next_run_info() -%}
+          <span
+            id="next-dataset-tooltip"
+            class="js-tooltip"
+            title="Click to see dataset details."
+            data-html="true"
+            data-placement="bottom"
+            data-uri="{{ ds_info.uri }}"
           >
-            Next Run: {{ dag_model.get_dataset_triggered_next_run_info() }}
-          </p>
-        </span>
+              <p
+                class="label label-default next-dataset-triggered"
+                style="margin-left: 5px;"
+                data-summary="
+                  {%- if ds_info.total == 1 -%}
+                  On {{ ds_info.uri }}
+                  {%- else -%}
+                  {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+                  {%- endif -%}"
+                >
+                {% if ds_info.total == 1 -%}
+                  On {{ ds_info.uri }}
+                {%- else -%}
+                {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+                {%- endif %}
+              </p>
+          </span>
+        {%- endwith -%}
       {% endif %}
     </h4>
   </div>
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index c675b4a05a..4639c17ace 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -297,22 +297,33 @@
             </td>
             <td class="text-nowrap {{ 'js-dataset-triggered' if dag.dag_id in dataset_triggered_next_run_info }}">
               {% if dag.dag_id in dataset_triggered_next_run_info %}
-                <span
-                  data-dag-id="{{ dag.dag_id }}"
-                  class="js-tooltip js-next-dataset-run-tooltip"
-                  title="Click to see dataset details."
-                  data-html="true"
-                >
-                  <div
-                    class="label label-default next-dataset-triggered"
+                {%- with ds_info = dataset_triggered_next_run_info[dag.dag_id] -%}
+                  <span
                     data-dag-id="{{ dag.dag_id }}"
-                    data-summary="{{ dataset_triggered_next_run_info[dag.dag_id] }}"
+                    class="js-tooltip js-next-dataset-run-tooltip"
+                    title="Click to see dataset details."
+                    data-html="true"
+                    data-uri="{{ ds_info.uri }}"
                   >
-                    {% with ds_info = dataset_triggered_next_run_info[dag.dag_id] %}
-                    {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
-                    {% endwith %}
-                  </div>
-                </span>
+                    <div
+                      class="label label-default next-dataset-triggered"
+                      data-dag-id="{{ dag.dag_id }}"
+                      data-uri="{{ ds_info.uri }}"
+                      data-summary="
+                      {%- if ds_info.total == 1 -%}
+                      On {{ ds_info.uri }}
+                      {%- else -%}
+                      {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+                      {%- endif -%}"
+                    >
+                      {% if ds_info.total == 1 -%}
+                        On {{ ds_info.uri[0:40] + '…' if ds_info.uri and ds_info.uri|length > 40 else ds_info.uri|default('', true) }}
+                      {%- else -%}
+                      {{ ds_info.ready }} of {{ ds_info.total }} datasets updated
+                      {%- endif %}
+                    </div>
+                  </span>
+                {%- endwith -%}
               {% endif %}
               {% if dag.next_dagrun is not none %}
                 <time datetime="{{ dag.next_dagrun }}">{{ dag.next_dagrun }}</time>
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 379fb05937..0e28868307 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -86,6 +86,13 @@ def clear_dags():
     clear_db_serialized_dags()
 
 
+@pytest.fixture
+def clear_datasets():
+    clear_db_datasets()
+    yield
+    clear_db_datasets()
+
+
 class TestDag:
     def setup_method(self) -> None:
         clear_db_runs()
@@ -3005,7 +3012,7 @@ def test__tags_length(tags: list[str], should_pass: bool):
 
 
 @pytest.mark.need_serialized_dag
-def test_get_dataset_triggered_next_run_info(dag_maker):
+def test_get_dataset_triggered_next_run_info(dag_maker, clear_datasets):
     dataset1 = Dataset(uri="ds1")
     dataset2 = Dataset(uri="ds2")
     dataset3 = Dataset(uri="ds3")
@@ -3031,10 +3038,13 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
     )
     session.flush()
 
+    datasets = session.query(DatasetModel.uri).order_by(DatasetModel.id).all()
+
     info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
     assert info[dag1.dag_id] == {
         "ready": 0,
         "total": 1,
+        "uri": datasets[0].uri,
     }
 
     # This time, check both dag2 and dag3 at the same time (tests filtering)
@@ -3042,10 +3052,12 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
     assert info[dag2.dag_id] == {
         "ready": 1,
         "total": 2,
+        "uri": "",
     }
     assert info[dag3.dag_id] == {
         "ready": 1,
         "total": 3,
+        "uri": "",
     }
 
 
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index d4c9c8603a..66ce41eff1 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -32,16 +32,18 @@ from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
 from airflow.utils.db_cleanup import _build_query, _cleanup_table, config_dict, run_cleanup
 from airflow.utils.session import create_session
-from tests.test_utils.db import clear_db_dags, clear_db_runs, drop_tables_with_prefix
+from tests.test_utils.db import clear_db_dags, clear_db_datasets, clear_db_runs, drop_tables_with_prefix
 
 
 @pytest.fixture(autouse=True)
 def clean_database():
     """Fixture that cleans the database before and after every test."""
     clear_db_runs()
+    clear_db_datasets()
     clear_db_dags()
     yield  # Test runs here
     clear_db_dags()
+    clear_db_datasets()
     clear_db_runs()