You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/29 17:17:45 UTC

[GitHub] ashb closed pull request #3964: [AIRFLOW-3079] Improve migration scripts to support MSSQL Server

ashb closed pull request #3964: [AIRFLOW-3079] Improve migration scripts to support MSSQL Server
URL: https://github.com/apache/incubator-airflow/pull/3964
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
index 6eef6a9437..643a1ca81b 100644
--- a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
+++ b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
@@ -39,6 +39,12 @@
 TABLE_NAME = 'task_reschedule'
 INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date'
 
+# For Microsoft SQL Server, TIMESTAMP is a row-id type,
+# having nothing to do with date-time.  DateTime() will
+# be sufficient.
+def mssql_timestamp():
+    return sa.DateTime()
+
 def mysql_timestamp():
     return mysql.TIMESTAMP(fsp=6)
 
@@ -50,6 +56,8 @@ def upgrade():
     conn = op.get_bind()
     if conn.dialect.name == 'mysql':
         timestamp = mysql_timestamp
+    elif conn.dialect.name == 'mssql':
+        timestamp = mssql_timestamp
     else:
         timestamp = sa_timestamp
 
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
index 567172f9bf..ac7824fd9e 100644
--- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -86,8 +86,8 @@ def upgrade():
         op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
     else:
-        # sqlite datetime is fine as is not converting
-        if conn.dialect.name == 'sqlite':
+        # sqlite and mssql datetime are fine as is.  Therefore, not converting
+        if conn.dialect.name in ('sqlite', 'mssql'):
             return
 
         # we try to be database agnostic, but not every db (e.g. sqlserver)
@@ -182,7 +182,7 @@ def downgrade():
         op.alter_column(table_name='xcom', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
     else:
-        if conn.dialect.name == 'sqlite':
+        if conn.dialect.name in ('sqlite', 'mssql'):
             return
 
         # we try to be database agnostic, but not every db (e.g. sqlserver)
diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index 925bf26df0..88a4199d32 100644
--- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -38,12 +38,23 @@
 
 
 def upgrade():
+
+    columns_and_constraints = [
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
+        sa.Column("resource_version", sa.String(255))
+    ]
+
+    conn = op.get_bind()
+
+    # alembic creates an invalid SQL for mssql dialect
+    if conn.dialect.name not in ('mssql'):
+        columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))
+
     table = op.create_table(
         RESOURCE_TABLE,
-        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
-        sa.Column("resource_version", sa.String(255)),
-        sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
+        *columns_and_constraints
     )
+
     op.bulk_insert(table, [
         {"resource_version": ""}
     ])
diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
index ace7845965..633201b03b 100644
--- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
+++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -38,12 +38,23 @@
 
 
 def upgrade():
+
+    columns_and_constraints = [
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
+        sa.Column("worker_uuid", sa.String(255))
+    ]
+
+    conn = op.get_bind()
+
+    # alembic creates an invalid SQL for mssql dialect
+    if conn.dialect.name not in ('mssql'):
+        columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id"))
+
     table = op.create_table(
         RESOURCE_TABLE,
-        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
-        sa.Column("worker_uuid", sa.String(255)),
-        sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id")
+        *columns_and_constraints
     )
+
     op.bulk_insert(table, [
         {"worker_uuid": ""}
     ])


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services