You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/13 23:04:36 UTC

[GitHub] [airflow] jedcunningham opened a new pull request, #25039: Alt: Add dataset events to dataset api

jedcunningham opened a new pull request, #25039:
URL: https://github.com/apache/airflow/pull/25039

   This is an alt to 25011, where we have a single /datasets/events endpoint that can be used for various use cases - filtering by dag, dag run, task, or dataset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25039: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r922216263


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -59,3 +63,42 @@ def get_datasets(
     query = apply_sorting(query, order_by, {}, allowed_filter_attrs)
     datasets = query.offset(offset).limit(limit).all()
     return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, total_entries=total_entries))
+
+
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+@provide_session
+@format_parameters({'limit': check_limit})
+def get_dataset_events(

Review Comment:
   Yeah unless we denormalize it, I don't think we should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #25039: Alt: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #25039:
URL: https://github.com/apache/airflow/pull/25039#issuecomment-1184492373

   I like this one better. It'll be easier to show relevant dataset events anywhere with a single endpoint instead of needing to add more later on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #25039: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #25039:
URL: https://github.com/apache/airflow/pull/25039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25039: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r922228264


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3461,6 +3491,57 @@ components:
                 $ref: '#/components/schemas/Dataset'
         - $ref: '#/components/schemas/CollectionInfo'
 
+    DatasetEvent:

Review Comment:
   I just didn't think it'd be very useful, but I can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #25039: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r922082706


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -59,3 +63,42 @@ def get_datasets(
     query = apply_sorting(query, order_by, {}, allowed_filter_attrs)
     datasets = query.offset(offset).limit(limit).all()
     return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, total_entries=total_entries))
+
+
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+@provide_session
+@format_parameters({'limit': check_limit})
+def get_dataset_events(

Review Comment:
   Actually never mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #25039: Add dataset events to dataset api

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r921698877


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -59,3 +63,42 @@ def get_datasets(
     query = apply_sorting(query, order_by, {}, allowed_filter_attrs)
     datasets = query.offset(offset).limit(limit).all()
     return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, total_entries=total_entries))
+
+
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+@provide_session
+@format_parameters({'limit': check_limit})
+def get_dataset_events(

Review Comment:
   wondering if we should maybe also allow filtering on URI here. what do you think?  seems like we probably ought to



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3461,6 +3491,57 @@ components:
                 $ref: '#/components/schemas/Dataset'
         - $ref: '#/components/schemas/CollectionInfo'
 
+    DatasetEvent:

Review Comment:
   any reason not to include the key i.e. `id`?



##########
tests/api_connexion/endpoints/test_dataset_endpoint.py:
##########
@@ -239,23 +230,240 @@ def test_should_respect_page_size_limit_default(self, session):
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets", environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 100
 
     @conf_vars({("api", "maximum_page_limit"): "150"})
     def test_should_return_conf_max_if_req_max_above_conf(self, session):
         datasets = [
             Dataset(
-                uri=f"s3://bucket/key/{i+1}",
+                uri=f"s3://bucket/key/{i}",
                 extra={"foo": "bar"},
                 created_at=timezone.parse(self.default_time),
                 updated_at=timezone.parse(self.default_time),
             )
-            for i in range(200)
+            for i in range(1, 200)
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets?limit=180", environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 150
+
+
+class TestGetDatasetEvents(TestDatasetEndpoint):
+    def test_should_respond_200(self, session):
+        self._create_dataset(session)
+        common = {
+            "dataset_id": 1,
+            "extra": "{'foo': 'bar'}",
+            "source_dag_id": "foo",
+            "source_task_id": "bar",
+            "source_run_id": "custom",
+            "source_map_index": -1,
+        }
+
+        events = [DatasetEvent(created_at=timezone.parse(self.default_time), **common) for i in [1, 2]]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get("/api/v1/datasets/events", environ_overrides={'REMOTE_USER': "test"})
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {"created_at": self.default_time, **common},
+                {"created_at": self.default_time, **common},
+            ],
+            "total_entries": 2,
+        }
+
+    @parameterized.expand(
+        [
+            ('dataset_id', '2'),
+            ('source_dag_id', 'dag2'),
+            ('source_task_id', 'task2'),
+            ('source_run_id', 'run2'),
+            ('source_map_index', '2'),
+        ]
+    )
+    @provide_session
+    def test_filtering(self, attr, value, session):
+        datasets = [
+            Dataset(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+                created_at=timezone.parse(self.default_time),
+                updated_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        events = [
+            DatasetEvent(
+                dataset_id=i,
+                source_dag_id=f"dag{i}",
+                source_task_id=f"task{i}",
+                source_run_id=f"run{i}",
+                source_map_index=i,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 3
+
+        response = self.client.get(
+            f"/api/v1/datasets/events?{attr}={value}", environ_overrides={'REMOTE_USER': "test"}
+        )
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {
+                    "dataset_id": 2,
+                    "extra": None,
+                    "source_dag_id": "dag2",
+                    "source_task_id": "task2",
+                    "source_run_id": "run2",
+                    "source_map_index": 2,
+                    "created_at": self.default_time,
+                }
+            ],
+            "total_entries": 1,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, session):
+        self._create_dataset(session)
+        events = [
+            DatasetEvent(
+                dataset_id=1,
+                extra="{'foo': 'bar'}",
+                source_dag_id="foo",
+                source_task_id="bar",
+                source_run_id="custom",
+                source_map_index=-1,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get(
+            "/api/v1/datasets/events?order_by=fake", environ_overrides={'REMOTE_USER': "test"}
+        )  # missing attr
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not exist on the model"
+        assert response.json['detail'] == msg
+
+    def test_should_raises_401_unauthenticated(self, session):
+        response = self.client.get("/api/v1/datasets/events")
+        assert_401(response)
+
+
+class TestGetDatasetEvenetsEndpointPagination(TestDatasetEndpoint):

Review Comment:
   ```suggestion
   class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org