You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/16 13:23:56 UTC

[GitHub] [airflow] dstandish commented on a diff in pull request #25080: add dagRuns/DR-ID/upstream-dataset-events endpoint

dstandish commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r922679907


##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then another event
+    first_timestamp = pendulum.now('UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same time as 3rd event in window
+    )
+    dr.dag = dag2
+    session.add(dr)
+    dr = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    @mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        response = self.client.get(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response

Review Comment:
   my thought was, we don't need to assert that it is called because it's implied by the fact that the response has the event it returns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org