You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/15 00:04:31 UTC

[GitHub] [airflow] dstandish commented on a diff in pull request #25039: Add dataset events to dataset api

dstandish commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r921698877


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -59,3 +63,42 @@ def get_datasets(
     query = apply_sorting(query, order_by, {}, allowed_filter_attrs)
     datasets = query.offset(offset).limit(limit).all()
     return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, total_entries=total_entries))
+
+
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+@provide_session
+@format_parameters({'limit': check_limit})
+def get_dataset_events(

Review Comment:
   wondering if we should maybe also allow filtering on URI here. what do you think?  seems like we probably ought to



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3461,6 +3491,57 @@ components:
                 $ref: '#/components/schemas/Dataset'
         - $ref: '#/components/schemas/CollectionInfo'
 
+    DatasetEvent:

Review Comment:
   any reason not to include the key i.e. `id`?



##########
tests/api_connexion/endpoints/test_dataset_endpoint.py:
##########
@@ -239,23 +230,240 @@ def test_should_respect_page_size_limit_default(self, session):
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets", environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 100
 
     @conf_vars({("api", "maximum_page_limit"): "150"})
     def test_should_return_conf_max_if_req_max_above_conf(self, session):
         datasets = [
             Dataset(
-                uri=f"s3://bucket/key/{i+1}",
+                uri=f"s3://bucket/key/{i}",
                 extra={"foo": "bar"},
                 created_at=timezone.parse(self.default_time),
                 updated_at=timezone.parse(self.default_time),
             )
-            for i in range(200)
+            for i in range(1, 200)
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets?limit=180", environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 150
+
+
+class TestGetDatasetEvents(TestDatasetEndpoint):
+    def test_should_respond_200(self, session):
+        self._create_dataset(session)
+        common = {
+            "dataset_id": 1,
+            "extra": "{'foo': 'bar'}",
+            "source_dag_id": "foo",
+            "source_task_id": "bar",
+            "source_run_id": "custom",
+            "source_map_index": -1,
+        }
+
+        events = [DatasetEvent(created_at=timezone.parse(self.default_time), **common) for i in [1, 2]]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get("/api/v1/datasets/events", environ_overrides={'REMOTE_USER': "test"})
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {"created_at": self.default_time, **common},
+                {"created_at": self.default_time, **common},
+            ],
+            "total_entries": 2,
+        }
+
+    @parameterized.expand(
+        [
+            ('dataset_id', '2'),
+            ('source_dag_id', 'dag2'),
+            ('source_task_id', 'task2'),
+            ('source_run_id', 'run2'),
+            ('source_map_index', '2'),
+        ]
+    )
+    @provide_session
+    def test_filtering(self, attr, value, session):
+        datasets = [
+            Dataset(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+                created_at=timezone.parse(self.default_time),
+                updated_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        events = [
+            DatasetEvent(
+                dataset_id=i,
+                source_dag_id=f"dag{i}",
+                source_task_id=f"task{i}",
+                source_run_id=f"run{i}",
+                source_map_index=i,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 3
+
+        response = self.client.get(
+            f"/api/v1/datasets/events?{attr}={value}", environ_overrides={'REMOTE_USER': "test"}
+        )
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {
+                    "dataset_id": 2,
+                    "extra": None,
+                    "source_dag_id": "dag2",
+                    "source_task_id": "task2",
+                    "source_run_id": "run2",
+                    "source_map_index": 2,
+                    "created_at": self.default_time,
+                }
+            ],
+            "total_entries": 1,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, session):
+        self._create_dataset(session)
+        events = [
+            DatasetEvent(
+                dataset_id=1,
+                extra="{'foo': 'bar'}",
+                source_dag_id="foo",
+                source_task_id="bar",
+                source_run_id="custom",
+                source_map_index=-1,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get(
+            "/api/v1/datasets/events?order_by=fake", environ_overrides={'REMOTE_USER': "test"}
+        )  # missing attr
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not exist on the model"
+        assert response.json['detail'] == msg
+
+    def test_should_raises_401_unauthenticated(self, session):
+        response = self.client.get("/api/v1/datasets/events")
+        assert_401(response)
+
+
+class TestGetDatasetEvenetsEndpointPagination(TestDatasetEndpoint):

Review Comment:
   ```suggestion
   class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org