You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/15 00:53:44 UTC

[GitHub] [airflow] dstandish opened a new pull request, #25080: WIP - add dagRuns/dataset-event-triggers endpoint

dstandish opened a new pull request, #25080:
URL: https://github.com/apache/airflow/pull/25080

   Rough draft, just getting started


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25080: add dagRuns/DR-ID/upstream-dataset-events endpoint

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r922445052


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -86,6 +91,68 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
     return dagrun_schema.dump(dag_run)
 
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+    ],
+)
+@provide_session
+def get_upstream_dataset_events(
+    *, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get a DAG Run."""
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun)
+        .filter(
+            DagRun.dag_id == dag_id,
+            DagRun.run_id == dag_run_id,
+        )
+        .one_or_none()
+    )
+    if dag_run is None:
+        raise NotFound(
+            "DAGRun not found",
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
+        )
+    events = _get_upstream_dataset_events(dag_run=dag_run, session=session)
+    return dataset_event_collection_schema.dump(
+        DatasetEventCollection(dataset_events=events, total_entries=len(events))
+    )
+
+
+def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session = NEW_SESSION) -> List["DagRun"]:

Review Comment:
   ```suggestion
   def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) -> List["DagRun"]:
   ```



##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then another event
+    first_timestamp = pendulum.now('UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same time as 3rd event in window
+    )
+    dr.dag = dag2
+    session.add(dr)
+    dr = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response

Review Comment:
   Should we check the mock was called?
   
   ```suggestion
           assert response.json == expected_response
           mock_get_events.assert_called_once()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25080: add dagRuns/DR-ID/upstream-dataset-events endpoint

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r923448182


##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then another event
+    first_timestamp = pendulum.now('UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same time as 3rd event in window
+    )
+    dr.dag = dag2
+    session.add(dr)
+    dr = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response

Review Comment:
   That's fair, I overlooked the fact that id, run_id, etc were all None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #25080: add dagRuns/DR-ID/upstream-dataset-events endpoint

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r922679907


##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then another event
+    first_timestamp = pendulum.now('UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same time as 3rd event in window
+    )
+    dr.dag = dag2
+    session.add(dr)
+    dr = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response

Review Comment:
   my thought was, we don't need to assert that it is called because it's implied by the fact that the response has the event it returns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish merged pull request #25080: add dagRuns/DR-ID/upstreamDatasetEvents endpoint

Posted by GitBox <gi...@apache.org>.
dstandish merged PR #25080:
URL: https://github.com/apache/airflow/pull/25080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org