You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/27 05:56:57 UTC
[airflow] branch main updated: Use simple Json in Dataset & DatasetEvent extra field (#25321)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 285c23a2f9 Use simple Json in Dataset & DatasetEvent extra field (#25321)
285c23a2f9 is described below
commit 285c23a2f90f4c765053aedbd3f92c9f58a84d28
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 27 06:56:48 2022 +0100
Use simple Json in Dataset & DatasetEvent extra field (#25321)
Replaces ExtendedJson with sqlalchemy_jsonfield type
---
airflow/api_connexion/schemas/dataset_schema.py | 5 +++--
airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 8 ++++----
airflow/models/dataset.py | 8 +++++---
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 2 +-
tests/api_connexion/endpoints/test_dataset_endpoint.py | 2 +-
5 files changed, 14 insertions(+), 11 deletions(-)
diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py
index 06c1dc866d..a2bc56f89d 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -20,6 +20,7 @@ from typing import List, NamedTuple
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+from airflow.api_connexion.schemas.common_schema import JsonObjectField
from airflow.models.dataset import Dataset, DatasetEvent
@@ -33,7 +34,7 @@ class DatasetSchema(SQLAlchemySchema):
id = auto_field()
uri = auto_field()
- extra = fields.Dict()
+ extra = JsonObjectField()
created_at = auto_field()
updated_at = auto_field()
@@ -67,7 +68,7 @@ class DatasetEventSchema(SQLAlchemySchema):
id = auto_field()
dataset_id = auto_field()
dataset_uri = fields.String(attribute='dataset.uri', dump_only=True)
- extra = fields.Dict()
+ extra = JsonObjectField()
source_task_id = auto_field()
source_dag_id = auto_field()
source_run_id = auto_field()
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index 8cc5d9dc2e..1903cdadf3 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -23,13 +23,13 @@ Revises: 44b7034f6bdc
Create Date: 2022-06-22 14:37:20.880672
"""
-
import sqlalchemy as sa
+import sqlalchemy_jsonfield
from alembic import op
from sqlalchemy import Integer, String, func
from airflow.migrations.db_types import TIMESTAMP, StringID
-from airflow.utils.sqlalchemy import ExtendedJSON
+from airflow.settings import json
revision = '0038cd0c28b4'
down_revision = '44b7034f6bdc'
@@ -55,7 +55,7 @@ def _create_dataset_table():
),
nullable=False,
),
- sa.Column('extra', ExtendedJSON, nullable=True),
+ sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
sa.Column('created_at', TIMESTAMP, nullable=False),
sa.Column('updated_at', TIMESTAMP, nullable=False),
sqlite_autoincrement=True, # ensures PK values not reused
@@ -123,7 +123,7 @@ def _create_dataset_event_table():
'dataset_event',
sa.Column('id', Integer, primary_key=True, autoincrement=True),
sa.Column('dataset_id', Integer, nullable=False),
- sa.Column('extra', ExtendedJSON, nullable=True),
+ sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
sa.Column('source_task_id', String(250), nullable=True),
sa.Column('source_dag_id', String(250), nullable=True),
sa.Column('source_run_id', String(250), nullable=True),
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index 21373106b5..94b28b6bb0 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -17,12 +17,14 @@
# under the License.
from urllib.parse import urlparse
+import sqlalchemy_jsonfield
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, String, text
from sqlalchemy.orm import relationship
from airflow.models.base import ID_LEN, Base, StringID
+from airflow.settings import json
from airflow.utils import timezone
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import UtcDateTime
class Dataset(Base):
@@ -46,7 +48,7 @@ class Dataset(Base):
),
nullable=False,
)
- extra = Column(ExtendedJSON, nullable=True)
+ extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
@@ -219,7 +221,7 @@ class DatasetEvent(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
dataset_id = Column(Integer, nullable=False)
- extra = Column(ExtendedJSON, nullable=True)
+ extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
source_task_id = Column(StringID(), nullable=True)
source_dag_id = Column(StringID(), nullable=True)
source_run_id = Column(StringID(), nullable=True)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 3b65450b99..969fd5748d 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1624,7 +1624,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
'timestamp': str(created_at),
'dataset_id': 1,
'dataset_uri': d.dataset.uri,
- 'extra': None,
+ 'extra': {},
'id': None,
'source_dag_id': None,
'source_map_index': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 0d025d3c6c..76cb18dd98 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -348,7 +348,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
"id": 2,
"dataset_id": 2,
"dataset_uri": datasets[1].uri,
- "extra": None,
+ "extra": {},
"source_dag_id": "dag2",
"source_task_id": "task2",
"source_run_id": "run2",