You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/25 18:39:26 UTC
[airflow] branch main updated: Add URI to dataset event response (#25250)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3bc10e90e0 Add URI to dataset event response (#25250)
3bc10e90e0 is described below
commit 3bc10e90e087f3f4d84b29eef9eaa8883082a25a
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jul 25 11:39:15 2022 -0700
Add URI to dataset event response (#25250)
Also, update the `extra` fields to return as JSON in the REST API.
---
airflow/api_connexion/openapi/v1.yaml | 11 +++++---
airflow/api_connexion/schemas/dataset_schema.py | 5 ++--
airflow/models/dataset.py | 4 +++
airflow/www/static/js/datasets/Details.tsx | 2 +-
airflow/www/static/js/types/api-generated.ts | 8 +++---
.../endpoints/test_dag_run_endpoint.py | 5 +++-
.../endpoints/test_dataset_endpoint.py | 31 +++++++++++++++-------
tests/api_connexion/schemas/test_dataset_schema.py | 18 ++++++++-----
8 files changed, 57 insertions(+), 27 deletions(-)
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index c6574f1a2d..7c9d45365e 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3494,7 +3494,7 @@ components:
description: The dataset uri
nullable: false
extra:
- type: string
+ type: object
description: The dataset extra
nullable: true
created_at:
@@ -3531,9 +3531,13 @@ components:
dataset_id:
type: integer
description: The dataset id
- extra:
+ dataset_uri:
type: string
- description: The dataset extra
+ description: The URI of the dataset
+ nullable: false
+ extra:
+ type: object
+ description: The dataset event extra
nullable: true
source_dag_id:
type: string
@@ -3556,7 +3560,6 @@ components:
description: The dataset event creation time
nullable: false
-
DatasetEventCollection:
description: |
A collection of dataset events.
diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py
index 5b2601cea7..e63f6ea7eb 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -33,7 +33,7 @@ class DatasetSchema(SQLAlchemySchema):
id = auto_field()
uri = auto_field()
- extra = auto_field()
+ extra = fields.Dict()
created_at = auto_field()
updated_at = auto_field()
@@ -66,7 +66,8 @@ class DatasetEventSchema(SQLAlchemySchema):
id = auto_field()
dataset_id = auto_field()
- extra = auto_field()
+ dataset_uri = fields.String(attribute='dataset.uri', dump_only=True)
+ extra = fields.Dict()
source_task_id = auto_field()
source_dag_id = auto_field()
source_run_id = auto_field()
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index e6db473a88..c16f157f8c 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -261,6 +261,10 @@ class DatasetEvent(Base):
uselist=False,
)
+ @property
+ def uri(self):
+ return self.dataset.uri
+
def __eq__(self, other) -> bool:
if isinstance(other, self.__class__):
return self.dataset_id == other.dataset_id and self.created_at == other.created_at
diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx
index 110406f291..3bbee2a818 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -123,7 +123,7 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
{!!dataset.extra && (
<Flex>
<Text mr={1}>Extra:</Text>
- <Code>{dataset.extra}</Code>
+ <Code>{JSON.stringify(dataset.extra)}</Code>
</Flex>
)}
<Flex my={2}>
diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts
index 70cc7d9902..c798721eb1 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1464,7 +1464,7 @@ export interface components {
/** @description The dataset uri */
uri?: string;
/** @description The dataset extra */
- extra?: string | null;
+ extra?: { [key: string]: unknown } | null;
/** @description The dataset creation time */
created_at?: string;
/** @description The dataset update time */
@@ -1486,8 +1486,10 @@ export interface components {
DatasetEvent: {
/** @description The dataset id */
dataset_id?: number;
- /** @description The dataset extra */
- extra?: string | null;
+ /** @description The URI of the dataset */
+ dataset_uri?: string;
+ /** @description The dataset event extra */
+ extra?: { [key: string]: unknown } | null;
/** @description The DAG ID that updated the dataset. */
source_dag_id?: string | null;
/** @description The task ID that updated the dataset. */
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 454f87afd4..1a25475df1 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1612,7 +1612,9 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
assert len(result) == 1
created_at = pendulum.now('UTC')
# make sure whatever is returned by this func is what comes out in response.
- mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
+ d = DatasetEvent(dataset_id=1, created_at=created_at)
+ d.dataset = Dataset(id=1, uri='hello', created_at=created_at, updated_at=created_at)
+ mock_get_events.return_value = [d]
response = self.client.get(
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents",
environ_overrides={'REMOTE_USER': "test"},
@@ -1623,6 +1625,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
{
'created_at': str(created_at),
'dataset_id': 1,
+ 'dataset_uri': d.dataset.uri,
'extra': None,
'id': None,
'source_dag_id': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 2696e60300..a06142dcbb 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -70,6 +70,7 @@ class TestDatasetEndpoint:
)
session.add(dataset_model)
session.commit()
+ return dataset_model
class TestGetDatasetEndpoint(TestDatasetEndpoint):
@@ -83,7 +84,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
assert response.json == {
"id": 1,
"uri": "s3://bucket/key",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
}
@@ -129,14 +130,14 @@ class TestGetDatasets(TestDatasetEndpoint):
{
"id": 1,
"uri": "s3://bucket/key/1",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
},
{
"id": 2,
"uri": "s3://bucket/key/2",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.default_time,
"updated_at": self.default_time,
},
@@ -258,10 +259,10 @@ class TestGetDatasetsEndpointPagination(TestDatasetEndpoint):
class TestGetDatasetEvents(TestDatasetEndpoint):
def test_should_respond_200(self, session):
- self._create_dataset(session)
+ d = self._create_dataset(session)
common = {
"dataset_id": 1,
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"source_dag_id": "foo",
"source_task_id": "bar",
"source_run_id": "custom",
@@ -279,20 +280,31 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
response_data = response.json
assert response_data == {
"dataset_events": [
- {"id": 1, "created_at": self.default_time, **common},
- {"id": 2, "created_at": self.default_time, **common},
+ {
+ "id": 1,
+ "created_at": self.default_time,
+ **common,
+ "dataset_uri": d.uri,
+ },
+ {
+ "id": 2,
+ "created_at": self.default_time,
+ **common,
+ "dataset_uri": d.uri,
+ },
],
"total_entries": 2,
}
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ 'attr, value',
[
('dataset_id', '2'),
('source_dag_id', 'dag2'),
('source_task_id', 'task2'),
('source_run_id', 'run2'),
('source_map_index', '2'),
- ]
+ ],
)
@provide_session
def test_filtering(self, attr, value, session):
@@ -335,6 +347,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
{
"id": 2,
"dataset_id": 2,
+ "dataset_uri": datasets[1].uri,
"extra": None,
"source_dag_id": "dag2",
"source_task_id": "task2",
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py
index 1d33fbf813..f6ed25b85c 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -52,7 +52,7 @@ class TestDatasetSchema(TestDatasetSchemaBase):
assert serialized_data == {
"id": 1,
"uri": "s3://bucket/key",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
}
@@ -82,14 +82,14 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
{
"id": 1,
"uri": "s3://bucket/key/1",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
},
{
"id": 2,
"uri": "s3://bucket/key/2",
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"created_at": self.timestamp,
"updated_at": self.timestamp,
},
@@ -100,9 +100,12 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
class TestDatasetEventSchema(TestDatasetSchemaBase):
def test_serialize(self, session):
+ d = Dataset('s3://abc')
+ session.add(d)
+ session.commit()
event = DatasetEvent(
id=1,
- dataset_id=10,
+ dataset_id=d.id,
extra={"foo": "bar"},
source_dag_id="foo",
source_task_id="bar",
@@ -115,8 +118,9 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
serialized_data = dataset_event_schema.dump(event)
assert serialized_data == {
"id": 1,
- "dataset_id": 10,
- "extra": "{'foo': 'bar'}",
+ "dataset_id": d.id,
+ "dataset_uri": "s3://abc",
+ "extra": {'foo': 'bar'},
"source_dag_id": "foo",
"source_task_id": "bar",
"source_run_id": "custom",
@@ -129,7 +133,7 @@ class TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
def test_serialize(self, session):
common = {
"dataset_id": 10,
- "extra": "{'foo': 'bar'}",
+ "extra": {'foo': 'bar'},
"source_dag_id": "foo",
"source_task_id": "bar",
"source_run_id": "custom",