You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/06/27 20:39:56 UTC
[airflow] branch main updated: Add dataset model (#24613)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dee5ba3a51 Add dataset model (#24613)
dee5ba3a51 is described below
commit dee5ba3a51593a3483032558082586bc9fcf9cd2
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jun 27 13:39:44 2022 -0700
Add dataset model (#24613)
Add model for storing references to datasets, a fundamental component of AIP-48.
---
airflow/__init__.py | 1 +
.../versions/0114_2_4_0_add_dataset_model.py | 69 ++++++++++++++++++++
airflow/models/__init__.py | 2 +
airflow/models/dataset.py | 75 ++++++++++++++++++++++
docs/apache-airflow/migrations-ref.rst | 4 +-
tests/utils/test_db.py | 3 +
tests/utils/test_db_cleanup.py | 3 +-
7 files changed, 155 insertions(+), 2 deletions(-)
diff --git a/airflow/__init__.py b/airflow/__init__.py
index cbbb03dd1b..826ffa80f2 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -57,6 +57,7 @@ PY310 = sys.version_info >= (3, 10)
# Things to lazy import in form 'name': 'path.to.module'
__lazy_imports = {
'DAG': 'airflow.models.dag',
+ 'Dataset': 'airflow.models.dataset',
'XComArg': 'airflow.models.xcom_arg',
'AirflowException': 'airflow.exceptions',
}
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
new file mode 100644
index 0000000000..838f8780b0
--- /dev/null
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add Dataset model
+
+Revision ID: 0038cd0c28b4
+Revises: 44b7034f6bdc
+Create Date: 2022-06-22 14:37:20.880672
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import Integer, String
+
+from airflow.migrations.db_types import TIMESTAMP
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+revision = '0038cd0c28b4'
+down_revision = '44b7034f6bdc'
+branch_labels = None
+depends_on = None
+airflow_version = '2.4.0'
+
+
+def upgrade():
+ """Apply Add Dataset model"""
+ op.create_table(
+ 'dataset',
+ sa.Column('id', Integer, primary_key=True, autoincrement=True),
+ sa.Column(
+ 'uri',
+ String(length=3000).with_variant(
+ String(
+ length=3000,
+ # latin1 allows for more indexed length in mysql
+ # and this field should only be ascii chars
+ collation='latin1_general_cs',
+ ),
+ 'mysql',
+ ),
+ nullable=False,
+ ),
+ sa.Column('extra', ExtendedJSON, nullable=True),
+ sa.Column('created_at', TIMESTAMP, nullable=False),
+ sa.Column('updated_at', TIMESTAMP, nullable=False),
+ sqlite_autoincrement=True, # ensures PK values not reused
+ )
+ op.create_index('idx_uri_unique', 'dataset', ['uri'], unique=True)
+
+
+def downgrade():
+ """Unapply Add Dataset model"""
+ op.drop_table('dataset')
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 4cb19eecde..ecf14576ee 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -26,6 +26,7 @@ from airflow.models.dagbag import DagBag
from airflow.models.dagpickle import DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
+from airflow.models.dataset import Dataset
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ImportError
from airflow.models.log import Log
@@ -58,6 +59,7 @@ __all__ = [
"DagPickle",
"DagRun",
"DagTag",
+ "Dataset",
"DbCallbackRequest",
"ImportError",
"Log",
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
new file mode 100644
index 0000000000..49f5dfd1f6
--- /dev/null
+++ b/airflow/models/dataset.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from urllib.parse import urlparse
+
+from sqlalchemy import Column, Index, Integer, String
+
+from airflow.models.base import Base
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+
+
+class Dataset(Base):
+ """
+ A table to store datasets.
+
+ :param uri: a string that uniquely identifies the dataset
+ :param extra: JSON field for arbitrary extra info
+ """
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ uri = Column(
+ String(length=3000).with_variant(
+ String(
+ length=3000,
+ # latin1 allows for more indexed length in mysql
+ # and this field should only be ascii chars
+ collation='latin1_general_cs',
+ ),
+ 'mysql',
+ ),
+ nullable=False,
+ )
+ extra = Column(ExtendedJSON, nullable=True)
+ created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+ updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
+
+ __tablename__ = "dataset"
+ __table_args__ = (
+ Index('idx_uri_unique', uri, unique=True),
+ {'sqlite_autoincrement': True}, # ensures PK values not reused
+ )
+
+ def __init__(self, uri: str, **kwargs):
+ try:
+ uri.encode('ascii')
+ except UnicodeEncodeError:
+ raise ValueError('URI must be ascii')
+ parsed = urlparse(uri)
+ if parsed.scheme and parsed.scheme.lower() == 'airflow':
+ raise ValueError("Scheme `airflow` is reserved.")
+ super().__init__(uri=uri, **kwargs)
+
+ def __eq__(self, other):
+ return self.uri == other.uri
+
+ def __hash__(self):
+ return hash(self.uri)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index b45e6c2de7..e5a5b13fa6 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
-| ``44b7034f6bdc`` (head) | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB |
+| ``0038cd0c28b4`` (head) | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``44b7034f6bdc`` | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``424117c37d18`` | ``f5fcbda3e651`` | ``2.4.0`` | Add DagWarning model |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 60511f2428..b6fe56b2d7 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -76,7 +76,10 @@ class TestDb:
# Ignore flask-session table/index
lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
+ # sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option
+ lambda t: (t[0] == 'remove_table' and t[1].name == 'sqlite_sequence'),
]
+
for ignore in ignores:
diff = [d for d in diff if not ignore(d)]
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 430e21215d..5c05545919 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -252,8 +252,9 @@ class TestDBCleanup:
all_models.update({class_.__tablename__: class_})
exclusion_list = {
'variable', # leave alone
+ 'dataset', # not good way to know if "stale"
'trigger', # self-maintaining
- 'task_map', # TODO: add datetime column to TaskMap so we can include it here
+ 'task_map', # keys to TI, so no need
'serialized_dag', # handled through FK to Dag
'log_template', # not a significant source of data; age not indicative of staleness
'dag_tag', # not a significant source of data; age not indicative of staleness,