You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/18 15:48:42 UTC
[airflow] branch main updated: Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8aa0b249e1 Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
8aa0b249e1 is described below
commit 8aa0b249e1fa0071f3a2458f13d9728564cbc8dd
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jul 18 08:48:37 2022 -0700
Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
Tells you, for a given dag run, the dataset events that are "part of" the dag run. I.e. they were part of the collection of dataset events that contributed to the triggering of the dag run. In practice we just query the events that occurred since the prev dag run. We may make the relationship tighter, see https://github.com/apache/airflow/pull/24969.
---
.../api_connexion/endpoints/dag_run_endpoint.py | 68 ++++++++
airflow/api_connexion/openapi/v1.yaml | 36 +++-
.../endpoints/test_dag_run_endpoint.py | 185 ++++++++++++++++++++-
3 files changed, 283 insertions(+), 6 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index f31b6a24ce..830f1dd925 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -41,12 +41,17 @@ from airflow.api_connexion.schemas.dag_run_schema import (
dagruns_batch_form_schema,
set_dagrun_state_form_schema,
)
+from airflow.api_connexion.schemas.dataset_schema import (
+ DatasetEventCollection,
+ dataset_event_collection_schema,
+)
from airflow.api_connexion.schemas.task_instance_schema import (
TaskInstanceReferenceCollection,
task_instance_reference_collection_schema,
)
from airflow.api_connexion.types import APIResponse
from airflow.models import DagModel, DagRun
+from airflow.models.dataset import DatasetDagRef, DatasetEvent
from airflow.security import permissions
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.session import NEW_SESSION, provide_session
@@ -86,6 +91,69 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
return dagrun_schema.dump(dag_run)
+@security.requires_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+ ],
+)
+@provide_session
+def get_upstream_dataset_events(
+ *, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+ """Get a DAG Run."""
+ dag_run: Optional[DagRun] = (
+ session.query(DagRun)
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.run_id == dag_run_id,
+ )
+ .one_or_none()
+ )
+ if dag_run is None:
+ raise NotFound(
+ "DAGRun not found",
+ detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
+ )
+ events = _get_upstream_dataset_events(dag_run=dag_run, session=session)
+ return dataset_event_collection_schema.dump(
+ DatasetEventCollection(dataset_events=events, total_entries=len(events))
+ )
+
+
+def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) -> List["DagRun"]:
+ """If dag run is dataset-triggered, return the dataset events that triggered it."""
+ if not dag_run.run_type == DagRunType.DATASET_TRIGGERED:
+ return []
+
+ previous_dag_run = (
+ session.query(DagRun)
+ .filter(
+ DagRun.dag_id == dag_run.dag_id,
+ DagRun.execution_date < dag_run.execution_date,
+ DagRun.run_type == DagRunType.DATASET_TRIGGERED,
+ )
+ .order_by(DagRun.execution_date.desc())
+ .first()
+ )
+
+ dataset_event_filters = [
+ DatasetDagRef.dag_id == dag_run.dag_id,
+ DatasetEvent.created_at <= dag_run.execution_date,
+ ]
+ if previous_dag_run:
+ dataset_event_filters.append(DatasetEvent.created_at > previous_dag_run.execution_date)
+ dataset_events = (
+ session.query(DatasetEvent)
+ .join(DatasetDagRef, DatasetEvent.dataset_id == DatasetDagRef.dataset_id)
+ .filter(*dataset_event_filters)
+ .order_by(DatasetEvent.created_at)
+ .all()
+ )
+ return dataset_events
+
+
def _fetch_dag_runs(
query: Query,
*,
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 2dfef4780c..c013771edb 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -818,6 +818,34 @@ paths:
'404':
$ref: '#/components/responses/NotFound'
+ /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents:
+ parameters:
+ - $ref: '#/components/parameters/DAGID'
+ - $ref: '#/components/parameters/DAGRunID'
+ get:
+ summary: Get dataset events for a DAG run
+ description: |
+ Get datasets for a dag run.
+
+ *New in version 2.4.0*
+ x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+ operationId: get_upstream_dataset_events
+ tags: [DAGRun, Dataset]
+ responses:
+ '200':
+ description: Success.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DatasetEventCollection'
+ '401':
+ $ref: '#/components/responses/Unauthenticated'
+ '403':
+ $ref: '#/components/responses/PermissionDenied'
+ '404':
+ $ref: '#/components/responses/NotFound'
+
+
/eventLogs:
get:
summary: List log entries
@@ -3508,19 +3536,19 @@ components:
source_dag_id:
type: string
description: The DAG ID that updated the dataset.
- nullable: false
+ nullable: true
source_task_id:
type: string
description: The task ID that updated the dataset.
- nullable: false
+ nullable: true
source_run_id:
type: string
description: The DAG run ID that updated the dataset.
- nullable: false
+ nullable: true
source_map_index:
type: integer
description: The task map index that updated the dataset.
- nullable: false
+ nullable: true
created_at:
type: string
description: The dataset event creation time
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index dd5803564a..454f87afd4 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -16,18 +16,22 @@
# under the License.
from datetime import timedelta
from unittest import mock
+from uuid import uuid4
+import pendulum
import pytest
from freezegun import freeze_time
from parameterized import parameterized
+from airflow import settings
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun, Dataset
+from airflow.models.dataset import DatasetEvent
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
@@ -44,6 +48,7 @@ def configured_app(minimal_app_for_api):
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
@@ -1483,3 +1488,179 @@ class TestClearDagRun(TestDagRunEndpoint):
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+ """If no prior dag runs, return all events"""
+ from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+ # setup dags and datasets
+ unique_id = str(uuid4())
+ session = settings.Session()
+ dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+ dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+ dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+ DAG.bulk_write_to_db(dags=[dag2], session=session)
+ session.add_all([dataset1a, dataset1b])
+ session.commit()
+
+ # add 5 events
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+ session.commit()
+
+ # create a single dag run, no prior dag runs
+ dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+ dr.dag = dag2
+ session.add(dr)
+ session.commit()
+ session.expunge_all()
+
+ # check result
+ events = _get_upstream_dataset_events(dag_run=dr, session=session)
+ assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+ """
+ Events returned should be those that occurred after last DATASET_TRIGGERED
+ dag run and up to the exec date of current dag run.
+ """
+ from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+ # setup dags and datasets
+ unique_id = str(uuid4())
+ session = settings.Session()
+ dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+ dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+ dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+ DAG.bulk_write_to_db(dags=[dag2], session=session)
+ session.add_all([dataset1a, dataset1b])
+ session.commit()
+
+ # add 2 events, then a dag run, then 3 events, then another dag run then another event
+ first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC')
+ session.add_all(
+ [
+ DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+ DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+ ]
+ )
+ dr1 = DagRun(
+ dag2.dag_id,
+ run_id=unique_id + '-1',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=first_timestamp.add(microseconds=1000),
+ )
+ dr1.dag = dag2
+ session.add(dr1)
+ session.add_all(
+ [
+ DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+ DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+ DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+ ]
+ )
+ dr2 = DagRun( # this dag run should be ignored
+ dag2.dag_id,
+ run_id=unique_id + '-3',
+ run_type=DagRunType.MANUAL,
+ execution_date=first_timestamp.add(microseconds=3000),
+ )
+ dr2.dag = dag2
+ session.add(dr2)
+ dr3 = DagRun(
+ dag2.dag_id,
+ run_id=unique_id + '-2',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=first_timestamp.add(microseconds=4000), # exact same time as 3rd event in window
+ )
+ dr3.dag = dag2
+ session.add(dr3)
+ session.add_all(
+ [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+ )
+ session.commit()
+ session.expunge_all()
+
+ events = _get_upstream_dataset_events(dag_run=dr3, session=session)
+
+ event_times = [x.created_at for x in events]
+ assert event_times == [
+ first_timestamp.add(microseconds=2000),
+ first_timestamp.add(microseconds=3000),
+ first_timestamp.add(microseconds=4000),
+ ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+ @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+ def test_should_respond_200(self, mock_get_events, session):
+ dagrun_model = DagRun(
+ dag_id="TEST_DAG_ID",
+ run_id="TEST_DAG_RUN_ID",
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state=DagRunState.RUNNING,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ created_at = pendulum.now('UTC')
+ # make sure whatever is returned by this func is what comes out in response.
+ mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+ response = self.client.get(
+ "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents",
+ environ_overrides={'REMOTE_USER': "test"},
+ )
+ assert response.status_code == 200
+ expected_response = {
+ 'dataset_events': [
+ {
+ 'created_at': str(created_at),
+ 'dataset_id': 1,
+ 'extra': None,
+ 'id': None,
+ 'source_dag_id': None,
+ 'source_map_index': None,
+ 'source_run_id': None,
+ 'source_task_id': None,
+ }
+ ],
+ 'total_entries': 1,
+ }
+ assert response.json == expected_response
+
+ def test_should_respond_404(self):
+ response = self.client.get(
+ "api/v1/dags/invalid-id/dagRuns/invalid-id/upstreamDatasetEvents",
+ environ_overrides={'REMOTE_USER': "test"},
+ )
+ assert response.status_code == 404
+ expected_resp = {
+ 'detail': "DAGRun with DAG ID: 'invalid-id' and DagRun ID: 'invalid-id' not found",
+ 'status': 404,
+ 'title': 'DAGRun not found',
+ 'type': EXCEPTIONS_LINK_MAP[404],
+ }
+ assert expected_resp == response.json
+
+ def test_should_raises_401_unauthenticated(self, session):
+ dagrun_model = DagRun(
+ dag_id="TEST_DAG_ID",
+ run_id="TEST_DAG_RUN_ID",
+ run_type=DagRunType.MANUAL,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+
+ response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents")
+
+ assert_401(response)