You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/18 15:48:42 UTC

[airflow] branch main updated: Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8aa0b249e1 Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
8aa0b249e1 is described below

commit 8aa0b249e1fa0071f3a2458f13d9728564cbc8dd
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jul 18 08:48:37 2022 -0700

    Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
    
    Tells you, for a given dag run, the dataset events that are "part of" the dag run.  I.e. they were part of the collection of dataset events that contributed to the triggering of the dag run.  In practice we just query the events that occurred since the prev dag run.  We may make the relationship tighter, see https://github.com/apache/airflow/pull/24969.
---
 .../api_connexion/endpoints/dag_run_endpoint.py    |  68 ++++++++
 airflow/api_connexion/openapi/v1.yaml              |  36 +++-
 .../endpoints/test_dag_run_endpoint.py             | 185 ++++++++++++++++++++-
 3 files changed, 283 insertions(+), 6 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index f31b6a24ce..830f1dd925 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -41,12 +41,17 @@ from airflow.api_connexion.schemas.dag_run_schema import (
     dagruns_batch_form_schema,
     set_dagrun_state_form_schema,
 )
+from airflow.api_connexion.schemas.dataset_schema import (
+    DatasetEventCollection,
+    dataset_event_collection_schema,
+)
 from airflow.api_connexion.schemas.task_instance_schema import (
     TaskInstanceReferenceCollection,
     task_instance_reference_collection_schema,
 )
 from airflow.api_connexion.types import APIResponse
 from airflow.models import DagModel, DagRun
+from airflow.models.dataset import DatasetDagRef, DatasetEvent
 from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -86,6 +91,69 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
     return dagrun_schema.dump(dag_run)
 
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+    ],
+)
+@provide_session
+def get_upstream_dataset_events(
+    *, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get a DAG Run."""
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun)
+        .filter(
+            DagRun.dag_id == dag_id,
+            DagRun.run_id == dag_run_id,
+        )
+        .one_or_none()
+    )
+    if dag_run is None:
+        raise NotFound(
+            "DAGRun not found",
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
+        )
+    events = _get_upstream_dataset_events(dag_run=dag_run, session=session)
+    return dataset_event_collection_schema.dump(
+        DatasetEventCollection(dataset_events=events, total_entries=len(events))
+    )
+
+
+def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) -> List["DagRun"]:
+    """If dag run is dataset-triggered, return the dataset events that triggered it."""
+    if not dag_run.run_type == DagRunType.DATASET_TRIGGERED:
+        return []
+
+    previous_dag_run = (
+        session.query(DagRun)
+        .filter(
+            DagRun.dag_id == dag_run.dag_id,
+            DagRun.execution_date < dag_run.execution_date,
+            DagRun.run_type == DagRunType.DATASET_TRIGGERED,
+        )
+        .order_by(DagRun.execution_date.desc())
+        .first()
+    )
+
+    dataset_event_filters = [
+        DatasetDagRef.dag_id == dag_run.dag_id,
+        DatasetEvent.created_at <= dag_run.execution_date,
+    ]
+    if previous_dag_run:
+        dataset_event_filters.append(DatasetEvent.created_at > previous_dag_run.execution_date)
+    dataset_events = (
+        session.query(DatasetEvent)
+        .join(DatasetDagRef, DatasetEvent.dataset_id == DatasetDagRef.dataset_id)
+        .filter(*dataset_event_filters)
+        .order_by(DatasetEvent.created_at)
+        .all()
+    )
+    return dataset_events
+
+
 def _fetch_dag_runs(
     query: Query,
     *,
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 2dfef4780c..c013771edb 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -818,6 +818,34 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+    get:
+      summary: Get dataset events for a DAG run
+      description: |
+        Get datasets for a dag run.
+
+        *New in version 2.4.0*
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: get_upstream_dataset_events
+      tags: [DAGRun, Dataset]
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DatasetEventCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
+        '403':
+          $ref: '#/components/responses/PermissionDenied'
+        '404':
+          $ref: '#/components/responses/NotFound'
+
+
   /eventLogs:
     get:
       summary: List log entries
@@ -3508,19 +3536,19 @@ components:
         source_dag_id:
           type: string
           description: The DAG ID that updated the dataset.
-          nullable: false
+          nullable: true
         source_task_id:
           type: string
           description: The task ID that updated the dataset.
-          nullable: false
+          nullable: true
         source_run_id:
           type: string
           description: The DAG run ID that updated the dataset.
-          nullable: false
+          nullable: true
         source_map_index:
           type: integer
           description: The task map index that updated the dataset.
-          nullable: false
+          nullable: true
         created_at:
           type: string
           description: The dataset event creation time
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index dd5803564a..454f87afd4 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -16,18 +16,22 @@
 # under the License.
 from datetime import timedelta
 from unittest import mock
+from uuid import uuid4
 
+import pendulum
 import pytest
 from freezegun import freeze_time
 from parameterized import parameterized
 
+from airflow import settings
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun, Dataset
+from airflow.models.dataset import DatasetEvent
 from airflow.operators.empty import EmptyOperator
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunType
 from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
 from tests.test_utils.config import conf_vars
@@ -44,6 +48,7 @@ def configured_app(minimal_app_for_api):
         role_name="Test",
         permissions=[
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
@@ -1483,3 +1488,179 @@ class TestClearDagRun(TestDagRunEndpoint):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then another event
+    first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr1 = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr1.dag = dag2
+    session.add(dr1)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr2 = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr2.dag = dag2
+    session.add(dr2)
+    dr3 = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same time as 3rd event in window
+    )
+    dr3.dag = dag2
+    session.add(dr3)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    events = _get_upstream_dataset_events(dag_run=dr3, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response
+
+    def test_should_respond_404(self):
+        response = self.client.get(
+            "api/v1/dags/invalid-id/dagRuns/invalid-id/upstreamDatasetEvents",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 404
+        expected_resp = {
+            'detail': "DAGRun with DAG ID: 'invalid-id' and DagRun ID: 'invalid-id' not found",
+            'status': 404,
+            'title': 'DAGRun not found',
+            'type': EXCEPTIONS_LINK_MAP[404],
+        }
+        assert expected_resp == response.json
+
+    def test_should_raises_401_unauthenticated(self, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.MANUAL,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents")
+
+        assert_401(response)