You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/25 18:39:26 UTC

[airflow] branch main updated: Add URI to dataset event response (#25250)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bc10e90e0 Add URI to dataset event response (#25250)
3bc10e90e0 is described below

commit 3bc10e90e087f3f4d84b29eef9eaa8883082a25a
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jul 25 11:39:15 2022 -0700

    Add URI to dataset event response (#25250)
    
    Also, update the `extra` fields to return as JSON in the REST API.
---
 airflow/api_connexion/openapi/v1.yaml              | 11 +++++---
 airflow/api_connexion/schemas/dataset_schema.py    |  5 ++--
 airflow/models/dataset.py                          |  4 +++
 airflow/www/static/js/datasets/Details.tsx         |  2 +-
 airflow/www/static/js/types/api-generated.ts       |  8 +++---
 .../endpoints/test_dag_run_endpoint.py             |  5 +++-
 .../endpoints/test_dataset_endpoint.py             | 31 +++++++++++++++-------
 tests/api_connexion/schemas/test_dataset_schema.py | 18 ++++++++-----
 8 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index c6574f1a2d..7c9d45365e 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3494,7 +3494,7 @@ components:
           description: The dataset uri
           nullable: false
         extra:
-          type: string
+          type: object
           description: The dataset extra
           nullable: true
         created_at:
@@ -3531,9 +3531,13 @@ components:
         dataset_id:
           type: integer
           description: The dataset id
-        extra:
+        dataset_uri:
           type: string
-          description: The dataset extra
+          description: The URI of the dataset
+          nullable: false
+        extra:
+          type: object
+          description: The dataset event extra
           nullable: true
         source_dag_id:
           type: string
@@ -3556,7 +3560,6 @@ components:
           description: The dataset event creation time
           nullable: false
 
-
     DatasetEventCollection:
       description: |
         A collection of dataset events.
diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py
index 5b2601cea7..e63f6ea7eb 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -33,7 +33,7 @@ class DatasetSchema(SQLAlchemySchema):
 
     id = auto_field()
     uri = auto_field()
-    extra = auto_field()
+    extra = fields.Dict()
     created_at = auto_field()
     updated_at = auto_field()
 
@@ -66,7 +66,8 @@ class DatasetEventSchema(SQLAlchemySchema):
 
     id = auto_field()
     dataset_id = auto_field()
-    extra = auto_field()
+    dataset_uri = fields.String(attribute='dataset.uri', dump_only=True)
+    extra = fields.Dict()
     source_task_id = auto_field()
     source_dag_id = auto_field()
     source_run_id = auto_field()
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index e6db473a88..c16f157f8c 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -261,6 +261,10 @@ class DatasetEvent(Base):
         uselist=False,
     )
 
+    @property
+    def uri(self):
+        return self.dataset.uri
+
     def __eq__(self, other) -> bool:
         if isinstance(other, self.__class__):
             return self.dataset_id == other.dataset_id and self.created_at == other.created_at
diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx
index 110406f291..3bbee2a818 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -123,7 +123,7 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
             {!!dataset.extra && (
               <Flex>
                 <Text mr={1}>Extra:</Text>
-                <Code>{dataset.extra}</Code>
+                <Code>{JSON.stringify(dataset.extra)}</Code>
               </Flex>
             )}
             <Flex my={2}>
diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts
index 70cc7d9902..c798721eb1 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1464,7 +1464,7 @@ export interface components {
       /** @description The dataset uri */
       uri?: string;
       /** @description The dataset extra */
-      extra?: string | null;
+      extra?: { [key: string]: unknown } | null;
       /** @description The dataset creation time */
       created_at?: string;
       /** @description The dataset update time */
@@ -1486,8 +1486,10 @@ export interface components {
     DatasetEvent: {
       /** @description The dataset id */
       dataset_id?: number;
-      /** @description The dataset extra */
-      extra?: string | null;
+      /** @description The URI of the dataset */
+      dataset_uri?: string;
+      /** @description The dataset event extra */
+      extra?: { [key: string]: unknown } | null;
       /** @description The DAG ID that updated the dataset. */
       source_dag_id?: string | null;
       /** @description The task ID that updated the dataset. */
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 454f87afd4..1a25475df1 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1612,7 +1612,9 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
         assert len(result) == 1
         created_at = pendulum.now('UTC')
         # make sure whatever is returned by this func is what comes out in response.
-        mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+        d = DatasetEvent(dataset_id=1, created_at=created_at)
+        d.dataset = Dataset(id=1, uri='hello', created_at=created_at, updated_at=created_at)
+        mock_get_events.return_value = [d]
         response = self.client.get(
             "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents",
             environ_overrides={'REMOTE_USER': "test"},
@@ -1623,6 +1625,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
                 {
                     'created_at': str(created_at),
                     'dataset_id': 1,
+                    'dataset_uri': d.dataset.uri,
                     'extra': None,
                     'id': None,
                     'source_dag_id': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 2696e60300..a06142dcbb 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -70,6 +70,7 @@ class TestDatasetEndpoint:
         )
         session.add(dataset_model)
         session.commit()
+        return dataset_model
 
 
 class TestGetDatasetEndpoint(TestDatasetEndpoint):
@@ -83,7 +84,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
         assert response.json == {
             "id": 1,
             "uri": "s3://bucket/key",
-            "extra": "{'foo': 'bar'}",
+            "extra": {'foo': 'bar'},
             "created_at": self.default_time,
             "updated_at": self.default_time,
         }
@@ -129,14 +130,14 @@ class TestGetDatasets(TestDatasetEndpoint):
                 {
                     "id": 1,
                     "uri": "s3://bucket/key/1",
-                    "extra": "{'foo': 'bar'}",
+                    "extra": {'foo': 'bar'},
                     "created_at": self.default_time,
                     "updated_at": self.default_time,
                 },
                 {
                     "id": 2,
                     "uri": "s3://bucket/key/2",
-                    "extra": "{'foo': 'bar'}",
+                    "extra": {'foo': 'bar'},
                     "created_at": self.default_time,
                     "updated_at": self.default_time,
                 },
@@ -258,10 +259,10 @@ class TestGetDatasetsEndpointPagination(TestDatasetEndpoint):
 
 class TestGetDatasetEvents(TestDatasetEndpoint):
     def test_should_respond_200(self, session):
-        self._create_dataset(session)
+        d = self._create_dataset(session)
         common = {
             "dataset_id": 1,
-            "extra": "{'foo': 'bar'}",
+            "extra": {'foo': 'bar'},
             "source_dag_id": "foo",
             "source_task_id": "bar",
             "source_run_id": "custom",
@@ -279,20 +280,31 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
         response_data = response.json
         assert response_data == {
             "dataset_events": [
-                {"id": 1, "created_at": self.default_time, **common},
-                {"id": 2, "created_at": self.default_time, **common},
+                {
+                    "id": 1,
+                    "created_at": self.default_time,
+                    **common,
+                    "dataset_uri": d.uri,
+                },
+                {
+                    "id": 2,
+                    "created_at": self.default_time,
+                    **common,
+                    "dataset_uri": d.uri,
+                },
             ],
             "total_entries": 2,
         }
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        'attr, value',
         [
             ('dataset_id', '2'),
             ('source_dag_id', 'dag2'),
             ('source_task_id', 'task2'),
             ('source_run_id', 'run2'),
             ('source_map_index', '2'),
-        ]
+        ],
     )
     @provide_session
     def test_filtering(self, attr, value, session):
@@ -335,6 +347,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
                 {
                     "id": 2,
                     "dataset_id": 2,
+                    "dataset_uri": datasets[1].uri,
                     "extra": None,
                     "source_dag_id": "dag2",
                     "source_task_id": "task2",
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py
index 1d33fbf813..f6ed25b85c 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -52,7 +52,7 @@ class TestDatasetSchema(TestDatasetSchemaBase):
         assert serialized_data == {
             "id": 1,
             "uri": "s3://bucket/key",
-            "extra": "{'foo': 'bar'}",
+            "extra": {'foo': 'bar'},
             "created_at": self.timestamp,
             "updated_at": self.timestamp,
         }
@@ -82,14 +82,14 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
                 {
                     "id": 1,
                     "uri": "s3://bucket/key/1",
-                    "extra": "{'foo': 'bar'}",
+                    "extra": {'foo': 'bar'},
                     "created_at": self.timestamp,
                     "updated_at": self.timestamp,
                 },
                 {
                     "id": 2,
                     "uri": "s3://bucket/key/2",
-                    "extra": "{'foo': 'bar'}",
+                    "extra": {'foo': 'bar'},
                     "created_at": self.timestamp,
                     "updated_at": self.timestamp,
                 },
@@ -100,9 +100,12 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
 
 class TestDatasetEventSchema(TestDatasetSchemaBase):
     def test_serialize(self, session):
+        d = Dataset('s3://abc')
+        session.add(d)
+        session.commit()
         event = DatasetEvent(
             id=1,
-            dataset_id=10,
+            dataset_id=d.id,
             extra={"foo": "bar"},
             source_dag_id="foo",
             source_task_id="bar",
@@ -115,8 +118,9 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
         serialized_data = dataset_event_schema.dump(event)
         assert serialized_data == {
             "id": 1,
-            "dataset_id": 10,
-            "extra": "{'foo': 'bar'}",
+            "dataset_id": d.id,
+            "dataset_uri": "s3://abc",
+            "extra": {'foo': 'bar'},
             "source_dag_id": "foo",
             "source_task_id": "bar",
             "source_run_id": "custom",
@@ -129,7 +133,7 @@ class TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
     def test_serialize(self, session):
         common = {
             "dataset_id": 10,
-            "extra": "{'foo': 'bar'}",
+            "extra": {'foo': 'bar'},
             "source_dag_id": "foo",
             "source_task_id": "bar",
             "source_run_id": "custom",