You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/06/27 20:39:56 UTC

[airflow] branch main updated: Add dataset model (#24613)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dee5ba3a51 Add dataset model (#24613)
dee5ba3a51 is described below

commit dee5ba3a51593a3483032558082586bc9fcf9cd2
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Jun 27 13:39:44 2022 -0700

    Add dataset model (#24613)
    
    Add model for storing references to datasets, a fundamental component of AIP-48.
---
 airflow/__init__.py                                |  1 +
 .../versions/0114_2_4_0_add_dataset_model.py       | 69 ++++++++++++++++++++
 airflow/models/__init__.py                         |  2 +
 airflow/models/dataset.py                          | 75 ++++++++++++++++++++++
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 tests/utils/test_db.py                             |  3 +
 tests/utils/test_db_cleanup.py                     |  3 +-
 7 files changed, 155 insertions(+), 2 deletions(-)

diff --git a/airflow/__init__.py b/airflow/__init__.py
index cbbb03dd1b..826ffa80f2 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -57,6 +57,7 @@ PY310 = sys.version_info >= (3, 10)
 # Things to lazy import in form 'name': 'path.to.module'
 __lazy_imports = {
     'DAG': 'airflow.models.dag',
+    'Dataset': 'airflow.models.dataset',
     'XComArg': 'airflow.models.xcom_arg',
     'AirflowException': 'airflow.exceptions',
 }
diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
new file mode 100644
index 0000000000..838f8780b0
--- /dev/null
+++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add Dataset model
+
+Revision ID: 0038cd0c28b4
+Revises: 44b7034f6bdc
+Create Date: 2022-06-22 14:37:20.880672
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import Integer, String
+
+from airflow.migrations.db_types import TIMESTAMP
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+revision = '0038cd0c28b4'
+down_revision = '44b7034f6bdc'
+branch_labels = None
+depends_on = None
+airflow_version = '2.4.0'
+
+
+def upgrade():
+    """Apply Add Dataset model"""
+    op.create_table(
+        'dataset',
+        sa.Column('id', Integer, primary_key=True, autoincrement=True),
+        sa.Column(
+            'uri',
+            String(length=3000).with_variant(
+                String(
+                    length=3000,
+                    # latin1 allows for more indexed length in mysql
+                    # and this field should only be ascii chars
+                    collation='latin1_general_cs',
+                ),
+                'mysql',
+            ),
+            nullable=False,
+        ),
+        sa.Column('extra', ExtendedJSON, nullable=True),
+        sa.Column('created_at', TIMESTAMP, nullable=False),
+        sa.Column('updated_at', TIMESTAMP, nullable=False),
+        sqlite_autoincrement=True,  # ensures PK values not reused
+    )
+    op.create_index('idx_uri_unique', 'dataset', ['uri'], unique=True)
+
+
+def downgrade():
+    """Unapply Add Dataset model"""
+    op.drop_table('dataset')
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 4cb19eecde..ecf14576ee 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -26,6 +26,7 @@ from airflow.models.dagbag import DagBag
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning
+from airflow.models.dataset import Dataset
 from airflow.models.db_callback_request import DbCallbackRequest
 from airflow.models.errors import ImportError
 from airflow.models.log import Log
@@ -58,6 +59,7 @@ __all__ = [
     "DagPickle",
     "DagRun",
     "DagTag",
+    "Dataset",
     "DbCallbackRequest",
     "ImportError",
     "Log",
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
new file mode 100644
index 0000000000..49f5dfd1f6
--- /dev/null
+++ b/airflow/models/dataset.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from urllib.parse import urlparse
+
+from sqlalchemy import Column, Index, Integer, String
+
+from airflow.models.base import Base
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+
+
+class Dataset(Base):
+    """
+    A table to store datasets.
+
+    :param uri: a string that uniquely identifies the dataset
+    :param extra: JSON field for arbitrary extra info
+    """
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    uri = Column(
+        String(length=3000).with_variant(
+            String(
+                length=3000,
+                # latin1 allows for more indexed length in mysql
+                # and this field should only be ascii chars
+                collation='latin1_general_cs',
+            ),
+            'mysql',
+        ),
+        nullable=False,
+    )
+    extra = Column(ExtendedJSON, nullable=True)
+    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
+    updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
+
+    __tablename__ = "dataset"
+    __table_args__ = (
+        Index('idx_uri_unique', uri, unique=True),
+        {'sqlite_autoincrement': True},  # ensures PK values not reused
+    )
+
+    def __init__(self, uri: str, **kwargs):
+        try:
+            uri.encode('ascii')
+        except UnicodeEncodeError:
+            raise ValueError('URI must be ascii')
+        parsed = urlparse(uri)
+        if parsed.scheme and parsed.scheme.lower() == 'airflow':
+            raise ValueError("Scheme `airflow` is reserved.")
+        super().__init__(uri=uri, **kwargs)
+
+    def __eq__(self, other):
+        return self.uri == other.uri
+
+    def __hash__(self):
+        return hash(self.uri)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})"
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index b45e6c2de7..e5a5b13fa6 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | Description                                                  |
 +=================================+===================+===================+==============================================================+
-| ``44b7034f6bdc`` (head)         | ``424117c37d18``  | ``2.4.0``         | compare types between ORM and DB                             |
+| ``0038cd0c28b4`` (head)         | ``44b7034f6bdc``  | ``2.4.0``         | Add Dataset model                                            |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``44b7034f6bdc``                | ``424117c37d18``  | ``2.4.0``         | compare types between ORM and DB                             |
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``424117c37d18``                | ``f5fcbda3e651``  | ``2.4.0``         | Add DagWarning model                                         |
 +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 60511f2428..b6fe56b2d7 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -76,7 +76,10 @@ class TestDb:
             # Ignore flask-session table/index
             lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
             lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
+            # sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option
+            lambda t: (t[0] == 'remove_table' and t[1].name == 'sqlite_sequence'),
         ]
+
         for ignore in ignores:
             diff = [d for d in diff if not ignore(d)]
 
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 430e21215d..5c05545919 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -252,8 +252,9 @@ class TestDBCleanup:
                         all_models.update({class_.__tablename__: class_})
         exclusion_list = {
             'variable',  # leave alone
+            'dataset',  # not good way to know if "stale"
             'trigger',  # self-maintaining
-            'task_map',  # TODO: add datetime column to TaskMap so we can include it here
+            'task_map',  # keys to TI, so no need
             'serialized_dag',  # handled through FK to Dag
             'log_template',  # not a significant source of data; age not indicative of staleness
             'dag_tag',  # not a significant source of data; age not indicative of staleness,