You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/27 05:56:57 UTC

[airflow] branch main updated: Use simple Json in Dataset & DatasetEvent extra field (#25321)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 285c23a2f9 Use simple Json in Dataset & DatasetEvent extra field (#25321)
285c23a2f9 is described below

commit 285c23a2f90f4c765053aedbd3f92c9f58a84d28
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 27 06:56:48 2022 +0100

    Use simple Json in Dataset & DatasetEvent extra field (#25321)
    
    Replaces ExtendedJson with sqlalchemy_jsonfield type
---
 airflow/api_connexion/schemas/dataset_schema.py             | 5 +++--
 airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 8 ++++----
 airflow/models/dataset.py                                   | 8 +++++---
 tests/api_connexion/endpoints/test_dag_run_endpoint.py      | 2 +-
 tests/api_connexion/endpoints/test_dataset_endpoint.py      | 2 +-
 5 files changed, 14 insertions(+), 11 deletions(-)

diff --git a/airflow/api_connexion/schemas/dataset_schema.py b/airflow/api_connexion/schemas/dataset_schema.py
index 06c1dc866d..a2bc56f89d 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -20,6 +20,7 @@ from typing import List, NamedTuple
 from marshmallow import Schema, fields
 from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
 
+from airflow.api_connexion.schemas.common_schema import JsonObjectField
 from airflow.models.dataset import Dataset, DatasetEvent
 
 
@@ -33,7 +34,7 @@ class DatasetSchema(SQLAlchemySchema):
 
     id = auto_field()
     uri = auto_field()
-    extra = fields.Dict()
+    extra = JsonObjectField()
     created_at = auto_field()
     updated_at = auto_field()
 
@@ -67,7 +68,7 @@ class DatasetEventSchema(SQLAlchemySchema):
     id = auto_field()
     dataset_id = auto_field()
     dataset_uri = fields.String(attribute='dataset.uri', dump_only=True)
-    extra = fields.Dict()
+    extra = JsonObjectField()
     source_task_id = auto_field()
     source_dag_id = auto_field()
     source_run_id = auto_field()
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
index 8cc5d9dc2e..1903cdadf3 100644
--- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -23,13 +23,13 @@ Revises: 44b7034f6bdc
 Create Date: 2022-06-22 14:37:20.880672
 
 """
-
 import sqlalchemy as sa
+import sqlalchemy_jsonfield
 from alembic import op
 from sqlalchemy import Integer, String, func
 
 from airflow.migrations.db_types import TIMESTAMP, StringID
-from airflow.utils.sqlalchemy import ExtendedJSON
+from airflow.settings import json
 
 revision = '0038cd0c28b4'
 down_revision = '44b7034f6bdc'
@@ -55,7 +55,7 @@ def _create_dataset_table():
             ),
             nullable=False,
         ),
-        sa.Column('extra', ExtendedJSON, nullable=True),
+        sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
         sa.Column('created_at', TIMESTAMP, nullable=False),
         sa.Column('updated_at', TIMESTAMP, nullable=False),
         sqlite_autoincrement=True,  # ensures PK values not reused
@@ -123,7 +123,7 @@ def _create_dataset_event_table():
         'dataset_event',
         sa.Column('id', Integer, primary_key=True, autoincrement=True),
         sa.Column('dataset_id', Integer, nullable=False),
-        sa.Column('extra', ExtendedJSON, nullable=True),
+        sa.Column('extra', sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
         sa.Column('source_task_id', String(250), nullable=True),
         sa.Column('source_dag_id', String(250), nullable=True),
         sa.Column('source_run_id', String(250), nullable=True),
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index 21373106b5..94b28b6bb0 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -17,12 +17,14 @@
 # under the License.
 from urllib.parse import urlparse
 
+import sqlalchemy_jsonfield
 from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, PrimaryKeyConstraint, String, text
 from sqlalchemy.orm import relationship
 
 from airflow.models.base import ID_LEN, Base, StringID
+from airflow.settings import json
 from airflow.utils import timezone
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import UtcDateTime
 
 
 class Dataset(Base):
@@ -46,7 +48,7 @@ class Dataset(Base):
         ),
         nullable=False,
     )
-    extra = Column(ExtendedJSON, nullable=True)
+    extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
     created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
     updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
 
@@ -219,7 +221,7 @@ class DatasetEvent(Base):
 
     id = Column(Integer, primary_key=True, autoincrement=True)
     dataset_id = Column(Integer, nullable=False)
-    extra = Column(ExtendedJSON, nullable=True)
+    extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
     source_task_id = Column(StringID(), nullable=True)
     source_dag_id = Column(StringID(), nullable=True)
     source_run_id = Column(StringID(), nullable=True)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 3b65450b99..969fd5748d 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1624,7 +1624,7 @@ class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
                     'timestamp': str(created_at),
                     'dataset_id': 1,
                     'dataset_uri': d.dataset.uri,
-                    'extra': None,
+                    'extra': {},
                     'id': None,
                     'source_dag_id': None,
                     'source_map_index': None,
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 0d025d3c6c..76cb18dd98 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -348,7 +348,7 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
                     "id": 2,
                     "dataset_id": 2,
                     "dataset_uri": datasets[1].uri,
-                    "extra": None,
+                    "extra": {},
                     "source_dag_id": "dag2",
                     "source_task_id": "task2",
                     "source_run_id": "run2",