You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by js...@apache.org on 2023/12/29 20:19:13 UTC

(airflow-mssql-migration) 01/01: initial commit

This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow-mssql-migration.git

commit d6dbdb123ffe5ce2385463740425f0c63ef72938
Author: Jens Scheffler <je...@de.bosch.com>
AuthorDate: Fri Dec 29 21:18:57 2023 +0100

    initial commit
---
 README.md         |   5 ++
 migrate_script.py | 254 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 259 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..97abbb3
--- /dev/null
+++ b/README.md
@@ -0,0 +1,5 @@
+# airflow-mssql-migration - Script to migrate Apache Airflow off MS SQL-Server
+
+This repository holds a script to migrate Apache Airflow meta database off from
+Microsoft SQL-Server into other database engines as support for SQL-Server
+ended in Airflow 2.8.0.
diff --git a/migrate_script.py b/migrate_script.py
new file mode 100644
index 0000000..6e073ae
--- /dev/null
+++ b/migrate_script.py
@@ -0,0 +1,254 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Migration script to port an Apache Airflow metadata DB to another DB engine.
+
+Call it with `python migrate_script.py --extract` on the source environment
+to dump the configured airflow metadata database to a SQLite file called
+`migration.db`. Then you can copy this file to the target environment
+(or re-configure your database backend) and run `python migrate_script.py --restore`.
+
+Note that it is common sense probably that this script is assuming that schedulers,
+workers, triggerer and webserver are stopped while running this script. It is also
+advised to halt all running jobs and DAG runs.
+
+Database schema must match.
+
+Note that this script is made for Airflow 2.7.3 and is provided without any warranty.
+"""
+
+# TODO test 2.7.3
+# TODO docs
+from __future__ import annotations
+
+import argparse
+import logging
+import os
+import subprocess
+
+from sqlalchemy import create_engine, delete, select, text
+from sqlalchemy.orm import Session
+
+from airflow.auth.managers.fab.models import Action, Permission, RegisterUser, Resource, Role, User
+from airflow.jobs.job import Job
+from airflow.models.connection import Connection
+from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag
+from airflow.models.dagcode import DagCode
+from airflow.models.dagpickle import DagPickle
+from airflow.models.dagrun import DagRun, DagRunNote
+from airflow.models.dagwarning import DagWarning
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetDagRunQueue,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
+from airflow.models.db_callback_request import DbCallbackRequest
+from airflow.models.errors import ImportError
+from airflow.models.log import Log
+from airflow.models.pool import Pool
+from airflow.models.renderedtifields import RenderedTaskInstanceFields
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.slamiss import SlaMiss
+from airflow.models.taskfail import TaskFail
+from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
+from airflow.models.tasklog import LogTemplate
+from airflow.models.taskmap import TaskMap
+from airflow.models.taskreschedule import TaskReschedule
+from airflow.models.trigger import Trigger
+from airflow.models.variable import Variable
+from airflow.models.xcom import BaseXCom, XCom
+from airflow.settings import engine
+
+# configuration variables
+airflow_db_url = engine.url
+temp_db_url = "sqlite:///migration.db"
+supported_db_versions = [
+    # see https://airflow.apache.org/docs/apache-airflow/stable/migrations-ref.html
+    "405de8318b3a"  # Airflow 2.7.3
+]
+
+# initialise logging
+logging.basicConfig(filename="migration.log", level=logging.DEBUG)
+
+
+def copy_airflow_tables(source_engine, target_engine):
+    objects_to_migrate = [
+        Action,
+        Resource,
+        Role,
+        User,
+        Permission,
+        RegisterUser,
+        Connection,
+        DagModel,
+        DagCode,
+        DagOwnerAttributes,
+        DagPickle,
+        DagTag,
+        Job,
+        LogTemplate,
+        DagRun,
+        DagRunNote,
+        DagWarning,
+        DatasetModel,
+        DagScheduleDatasetReference,
+        TaskOutletDatasetReference,
+        DatasetDagRunQueue,
+        DatasetEvent,
+        DbCallbackRequest,
+        ImportError,
+        Log,
+        Pool,
+        Trigger,
+        RenderedTaskInstanceFields,
+        SerializedDagModel,
+        SlaMiss,
+        TaskInstance,
+        TaskInstanceNote,
+        TaskFail,
+        TaskMap,
+        TaskReschedule,
+        Variable,
+        BaseXCom,
+        "ab_user_role",  # besides the ORM objects some table which demand cleaning are listed
+        "ab_permission_view",
+        "ab_permission_view_role",
+    ]
+    source_session = Session(bind=source_engine)
+    target_session = Session(bind=target_engine)
+    target_session.autoflush = False
+    dialect_name = target_session.bind.dialect.name
+    quote = "`" if dialect_name == "mysql" else '"'
+
+    # check that source DB is a supported version
+    db_version = target_session.scalar("SELECT * FROM alembic_version")
+    if db_version not in supported_db_versions:
+        raise ValueError(f"Unsupported Airflow Schema version {db_version}")
+    source_version = source_session.scalar("SELECT * FROM alembic_version")
+    if source_version != db_version:
+        raise ValueError(
+            f"Database schema must match. Source is {source_version}, destination is {db_version}."
+        )
+
+    # Deserialization fails, but we want to transfer the blob as original anyway, mock serialization away
+    def deserialize_mock(self: XCom):
+        return self.value
+
+    BaseXCom.orm_deserialize_value = deserialize_mock
+
+    # Step 1 - delete any leftovers, ensure all tables to be migrated are empty - use reverse order
+    for clz in reversed(objects_to_migrate):
+        if isinstance(clz, str):
+            logging.info("Cleaning table %s", clz)
+            target_session.execute(f"DELETE FROM {quote}{clz}{quote}")
+        else:
+            logging.info("Cleaning table %s", clz.__tablename__)
+            if clz == User:
+                # The user has a self-constraint, need to delete in batches not to violate cross-dependencies
+                continue_delete = True
+                while continue_delete:
+                    filter_uids = set()
+                    for uid in target_session.execute(
+                        text("SELECT changed_by_fk FROM ab_user WHERE changed_by_fk IS NOT NULL")
+                    ).fetchall():
+                        filter_uids.add(uid)
+                    for uid in target_session.execute(
+                        text("SELECT created_by_fk FROM ab_user WHERE created_by_fk IS NOT NULL")
+                    ).fetchall():
+                        filter_uids.add(uid)
+                    uid_list = ",".join(str(uid[0]) for uid in filter_uids)
+                    if uid_list:
+                        continue_delete = (
+                            target_session.execute(
+                                f"DELETE FROM ab_user WHERE id NOT IN ({uid_list})"
+                            ).rowcount
+                            > 0
+                        )
+                    else:
+                        continue_delete = target_session.execute(delete(clz)).rowcount > 0
+            else:
+                target_session.execute(delete(clz))
+    # Step 2 - copy all data over, use only ORM mapped tables
+    for clz in objects_to_migrate:
+        count = 0
+        if not isinstance(clz, str):
+            logging.info("Migration of %s started", clz.__tablename__)
+            for item in source_session.scalars(select(clz)).unique():
+                target_session.merge(item)
+                target_session.flush()
+                count += 1
+                if count % 100 == 0:
+                    logging.info("Migration of chunk finished, %i migrated", count)
+            logging.info("Migration of %s finished with %i rows", clz.__tablename__, count)
+            target_session.commit()
+    # Step 3 - update sequences to ensure new records continue with valid IDs auto-generated
+    for clz in objects_to_migrate:
+        count = 0
+        if not isinstance(clz, str) and "id" in clz.__dict__:
+            logging.info("Resetting sequence value for %s", clz.__tablename__)
+            max = target_session.scalar(f"SELECT MAX(id) FROM {quote}{clz.__tablename__}{quote}")
+            if max:
+                if dialect_name == "postgresql":
+                    target_session.execute(
+                        f"ALTER SEQUENCE {quote}{clz.__tablename__}_id_seq{quote} RESTART WITH {max+1}"
+                    )
+                elif dialect_name == "sqlite":
+                    pass  # nothing to be done for sqlite
+                elif dialect_name == "mysql":
+                    target_session.execute(
+                        f"ALTER TABLE {quote}{clz.__tablename__}{quote} AUTO_INCREMENT = {max+1}"
+                    )
+                else:  # e.g. "mssql"
+                    raise Exception(f"Database type {dialect_name} not supported")
+            target_session.commit()
+
+
+def main(extract: bool, restore: bool):
+    if extract == restore:
+        raise ValueError("Please specify what you want to do! Or use --help")
+
+    if extract:
+        logging.info("Creating migration database %s", temp_db_url)
+        envs = os.environ.copy()
+        envs["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = temp_db_url
+        subprocess.check_call(args=["airflow", "db", "reset", "--yes"], env=envs)
+
+    # source and target database
+    airflow_engine = create_engine(airflow_db_url)
+    temp_engine = create_engine(temp_db_url)
+    logging.info("Connection to databases established")
+
+    if extract:
+        copy_airflow_tables(airflow_engine, temp_engine)
+    else:
+        copy_airflow_tables(temp_engine, airflow_engine)
+    logging.info("Migration completed!")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--extract", help="Extracts the current Airflow database to a SQLite file", action="store_true"
+    )
+    parser.add_argument(
+        "--restore", help="Restores from a SQLite to the current Airflow database", action="store_true"
+    )
+    args = parser.parse_args()
+    main(args.extract, args.restore)