You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/11 13:06:53 UTC

[GitHub] [airflow] aneesh-joseph commented on a change in pull request #9973: Improve compatibility with mssql

aneesh-joseph commented on a change in pull request #9973:
URL: https://github.com/apache/airflow/pull/9973#discussion_r611186097



##########
File path: airflow/migrations/versions/5ccc55a461b1_make_mssql_tables_in_sync_with_sqla_.py
##########
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""make mssql tables in sync with SQLA models
+
+Revision ID: 5ccc55a461b1
+Revises: e9304a3141f0
+Create Date: 2021-04-06 14:22:02.197726
+
+"""
+
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '5ccc55a461b1'
+down_revision = 'e9304a3141f0'
+branch_labels = None
+depends_on = None
+
+
+def __get_timestamp(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    if mssql_version not in ("2000", "2005"):
+        return mssql.DATETIME2(precision=6)
+    else:
+        return mssql.DATETIME
+
+
+def upgrade():
+    """Apply make mssql tables in sync with SQLA models"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        op.alter_column(
+            table_name="xcom", column_name="timestamp", type_=__get_timestamp(conn), nullable=False
+        )
+        with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+            task_reschedule_batch_op.alter_column(
+                column_name='end_date', type_=__get_timestamp(conn), nullable=False
+            )
+            task_reschedule_batch_op.alter_column(
+                column_name='reschedule_date', type_=__get_timestamp(conn), nullable=False
+            )
+            task_reschedule_batch_op.alter_column(
+                column_name='start_date', type_=__get_timestamp(conn), nullable=False
+            )
+        with op.batch_alter_table('task_fail') as task_fail_batch_op:
+            task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+            task_fail_batch_op.alter_column(
+                column_name="execution_date", type_=__get_timestamp(conn), nullable=False
+            )
+            task_fail_batch_op.create_index(
+                'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 'execution_date'], unique=False
+            )
+        with op.batch_alter_table('task_instance') as task_instance_batch_op:
+            task_instance_batch_op.drop_index('ti_state_lkp')
+            task_instance_batch_op.create_index(
+                'ti_state_lkp', ['dag_id', 'task_id', 'execution_date', 'state'], unique=False
+            )
+
+
+def downgrade():
+    """Unapply make mssql tables in sync with SQLA models"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        op.alter_column(
+            table_name="xcom", column_name="timestamp", type_=__get_timestamp(conn), nullable=True
+        )
+        with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+            task_reschedule_batch_op.alter_column(
+                column_name='end_date', type_=__get_timestamp(conn), nullable=True
+            )
+            task_reschedule_batch_op.alter_column(
+                column_name='reschedule_date', type_=__get_timestamp(conn), nullable=True
+            )
+            task_reschedule_batch_op.alter_column(
+                column_name='start_date', type_=__get_timestamp(conn), nullable=True
+            )
+        with op.batch_alter_table('task_fail') as task_fail_batch_op:
+            task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+            task_fail_batch_op.alter_column(
+                column_name="execution_date", type_=__get_timestamp(conn), nullable=False
+            )
+            task_fail_batch_op.create_index(
+                'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 'execution_date'], unique=False
+            )
+        with op.batch_alter_table('task_instance') as task_instance_batch_op:
+            task_instance_batch_op.drop_index('ti_state_lkp')
+            task_instance_batch_op.create_index(
+                'ti_state_lkp', ['dag_id', 'task_id', 'execution_date'], unique=False
+            )

Review comment:
       done

##########
File path: airflow/migrations/versions/83f031fd9f1c_change_ts_columns_to_datetime_on_mssql.py
##########
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""change ts/datetime columns to datetime/datetime2 on mssql
+
+Revision ID: 83f031fd9f1c
+Revises: 90d1635d7b86
+Create Date: 2021-04-06 12:22:02.197726
+
+"""
+
+from collections import defaultdict
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '83f031fd9f1c'
+down_revision = '90d1635d7b86'
+branch_labels = None
+depends_on = None
+
+
+def is_table_empty(conn, table_name):
+    """
+    This function checks if the mssql table is empty
+    :param conn: sql connection object
+    :param table_name: table name
+    :return: Booelan indicating if the table is present
+    """
+    return conn.execute(f'select TOP 1 * from {table_name}').first() is None
+
+
+def get_table_constraints(conn, table_name):
+    """
+    This function return primary and unique constraint
+    along with column name. some tables like task_instance
+    is missing primary key constraint name and the name is
+    auto-generated by sql server. so this function helps to
+    retrieve any primary or unique constraint name.
+
+    :param conn: sql connection object
+    :param table_name: table name
+    :return: a dictionary of ((constraint name, constraint type), column name) of table
+    :rtype: defaultdict(list)
+    """
+    query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
+     FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
+     JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+     WHERE tc.TABLE_NAME = '{table_name}' AND
+     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE')
+    """.format(
+        table_name=table_name
+    )
+    result = conn.execute(query).fetchall()
+    constraint_dict = defaultdict(list)
+    for constraint, constraint_type, column in result:
+        constraint_dict[(constraint, constraint_type)].append(column)
+    return constraint_dict
+
+
+def drop_column_constraints(operator, column_name, constraint_dict):
+    """
+    Drop a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.drop_constraint(constraint[0], type_='primary')
+            elif constraint[1].lower().startswith("unique"):
+                operator.drop_constraint(constraint[0], type_='unique')
+
+
+def create_constraints(operator, column_name, constraint_dict):
+    """
+    Create a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.create_primary_key(constraint_name=constraint[0], columns=columns)
+            elif constraint[1].lower().startswith("unique"):
+                operator.create_unique_constraint(constraint_name=constraint[0], columns=columns)
+
+
+def __use_date_time2(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    return mssql_version not in ("2000", "2005")
+
+
+def __is_timestamp(conn, table_name, column_name):
+    query = f"""SELECT
+    TYPE_NAME(C.USER_TYPE_ID) AS DATA_TYPE
+    FROM SYS.COLUMNS C
+    JOIN SYS.TYPES T
+    ON C.USER_TYPE_ID=T.USER_TYPE_ID
+    WHERE C.OBJECT_ID=OBJECT_ID('{table_name}') and C.NAME='{column_name}';
+    """
+    column_type = conn.execute(query).fetchone()[0]
+    return column_type == "timestamp"
+
+
+def recreate_mssql_ts_column(conn, op, table_name, column_name):
+    """
+    Drop the timestamp column and recreate it as
+    datetime or datetime2(6)
+    """
+    if __is_timestamp(conn, table_name, column_name) and is_table_empty(conn, table_name):
+        with op.batch_alter_table(table_name) as batch_op:
+            constraint_dict = get_table_constraints(conn, table_name)
+            drop_column_constraints(batch_op, column_name, constraint_dict)
+            batch_op.drop_column(column_name=column_name)
+            if __use_date_time2(conn):
+                batch_op.add_column(sa.Column(column_name, mssql.DATETIME2(precision=6), nullable=False))
+            else:
+                batch_op.add_column(sa.Column(column_name, mssql.DATETIME, nullable=False))
+            create_constraints(batch_op, column_name, constraint_dict)
+
+
+def alter_mssql_datetime_column(conn, op, table_name, column_name, nullable):
+    """Update the datetime column to datetime2(6)"""
+    if __use_date_time2(conn):
+        op.alter_column(
+            table_name=table_name,
+            column_name=column_name,
+            type_=mssql.DATETIME2(precision=6),
+            nullable=nullable,
+        )
+
+
+def alter_mssql_datetime2_column(conn, op, table_name, column_name, nullable):
+    """Update the datetime2(6) column to datetime"""
+    if __use_date_time2(conn):
+        op.alter_column(
+            table_name=table_name, column_name=column_name, type_=mssql.DATETIME, nullable=nullable
+        )
+
+
+def upgrade():
+    """Change timestamp and datetime to datetime2/datetime when using MSSQL as backend"""
+    conn = op.get_bind()
+    if conn.dialect.name == 'mssql':

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org