You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/12 19:36:11 UTC

[GitHub] potiuk closed pull request #4488: [AIRFLOW-3678] Fixes 'airflow resetdb' for initialized Postgres DB

potiuk closed pull request #4488: [AIRFLOW-3678] Fixes 'airflow resetdb' for initialized Postgres DB
URL: https://github.com/apache/airflow/pull/4488
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
index 643a1ca81b..b467b4bc96 100644
--- a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
+++ b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
@@ -26,6 +26,8 @@
 """
 
 # revision identifiers, used by Alembic.
+from sqlalchemy.engine.reflection import Inspector
+
 revision = '0a2a5b66e19d'
 down_revision = '9635ae0956e7'
 branch_labels = None
@@ -61,22 +63,28 @@ def upgrade():
     else:
         timestamp = sa_timestamp
 
-    op.create_table(
-        TABLE_NAME,
-        sa.Column('id', sa.Integer(), nullable=False),
-        sa.Column('task_id', sa.String(length=250), nullable=False),
-        sa.Column('dag_id', sa.String(length=250), nullable=False),
-        # use explicit server_default=None otherwise mysql implies defaults for first timestamp column
-        sa.Column('execution_date', timestamp(), nullable=False, server_default=None),
-        sa.Column('try_number', sa.Integer(), nullable=False),
-        sa.Column('start_date', timestamp(), nullable=False),
-        sa.Column('end_date', timestamp(), nullable=False),
-        sa.Column('duration', sa.Integer(), nullable=False),
-        sa.Column('reschedule_date', timestamp(), nullable=False),
-        sa.PrimaryKeyConstraint('id'),
-        sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'],
-                                ['task_instance.task_id', 'task_instance.dag_id','task_instance.execution_date'],
-                                name='task_reschedule_dag_task_date_fkey')
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+    if TABLE_NAME not in tables:
+        op.create_table(
+            TABLE_NAME,
+            sa.Column('id', sa.Integer(), nullable=False),
+            sa.Column('task_id', sa.String(length=250), nullable=False),
+            sa.Column('dag_id', sa.String(length=250), nullable=False),
+            # use explicit server_default=None otherwise mysql implies defaults for first
+            # timestamp column
+            sa.Column('execution_date', timestamp(), nullable=False, server_default=None),
+            sa.Column('try_number', sa.Integer(), nullable=False),
+            sa.Column('start_date', timestamp(), nullable=False),
+            sa.Column('end_date', timestamp(), nullable=False),
+            sa.Column('duration', sa.Integer(), nullable=False),
+            sa.Column('reschedule_date', timestamp(), nullable=False),
+            sa.PrimaryKeyConstraint('id'),
+            sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'],
+                                    ['task_instance.task_id', 'task_instance.dag_id',
+                                     'task_instance.execution_date'],
+                                    name='task_reschedule_dag_task_date_fkey')
     )
     op.create_index(
         INDEX_NAME,
diff --git a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
index 50d53652c4..20aaa2e8fd 100644
--- a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
+++ b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
@@ -29,6 +29,8 @@
 import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
+from sqlalchemy.engine.reflection import Inspector
+
 revision = '1b38cef5b76e'
 down_revision = '502898887f84'
 branch_labels = None
@@ -36,16 +38,20 @@
 
 
 def upgrade():
-    op.create_table('dag_run',
-                    sa.Column('id', sa.Integer(), nullable=False),
-                    sa.Column('dag_id', sa.String(length=250), nullable=True),
-                    sa.Column('execution_date', sa.DateTime(), nullable=True),
-                    sa.Column('state', sa.String(length=50), nullable=True),
-                    sa.Column('run_id', sa.String(length=250), nullable=True),
-                    sa.Column('external_trigger', sa.Boolean(), nullable=True),
-                    sa.PrimaryKeyConstraint('id'),
-                    sa.UniqueConstraint('dag_id', 'execution_date'),
-                    sa.UniqueConstraint('dag_id', 'run_id'))
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+    if 'dag_run' not in tables:
+        op.create_table('dag_run',
+                        sa.Column('id', sa.Integer(), nullable=False),
+                        sa.Column('dag_id', sa.String(length=250), nullable=True),
+                        sa.Column('execution_date', sa.DateTime(), nullable=True),
+                        sa.Column('state', sa.String(length=50), nullable=True),
+                        sa.Column('run_id', sa.String(length=250), nullable=True),
+                        sa.Column('external_trigger', sa.Boolean(), nullable=True),
+                        sa.PrimaryKeyConstraint('id'),
+                        sa.UniqueConstraint('dag_id', 'execution_date'),
+                        sa.UniqueConstraint('dag_id', 'run_id'))
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index 88a4199d32..f7fa0aa6b4 100644
--- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -29,6 +29,8 @@
 import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
+from sqlalchemy.engine.reflection import Inspector
+
 revision = '33ae817a1ff4'
 down_revision = 'd2ae31099d61'
 branch_labels = None
@@ -50,10 +52,14 @@ def upgrade():
     if conn.dialect.name not in ('mssql'):
         columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))
 
-    table = op.create_table(
-        RESOURCE_TABLE,
-        *columns_and_constraints
-    )
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+    if RESOURCE_TABLE not in tables:
+        table = op.create_table(
+            RESOURCE_TABLE,
+            *columns_and_constraints
+        )
 
     op.bulk_insert(table, [
         {"resource_version": ""}
diff --git a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
index 2def57e904..7b4b99ace3 100644
--- a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
+++ b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
@@ -27,6 +27,8 @@
 import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
+from sqlalchemy.engine.reflection import Inspector
+
 revision = '64de9cddf6c9'
 down_revision = '211e584da130'
 branch_labels = None
@@ -34,17 +36,22 @@
 
 
 def upgrade():
-    op.create_table(
-        'task_fail',
-        sa.Column('id', sa.Integer(), nullable=False),
-        sa.Column('task_id', sa.String(length=250), nullable=False),
-        sa.Column('dag_id', sa.String(length=250), nullable=False),
-        sa.Column('execution_date', sa.DateTime(), nullable=False),
-        sa.Column('start_date', sa.DateTime(), nullable=True),
-        sa.Column('end_date', sa.DateTime(), nullable=True),
-        sa.Column('duration', sa.Integer(), nullable=True),
-        sa.PrimaryKeyConstraint('id'),
-    )
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+
+    if 'task_fail' not in tables:
+        op.create_table(
+            'task_fail',
+            sa.Column('id', sa.Integer(), nullable=False),
+            sa.Column('task_id', sa.String(length=250), nullable=False),
+            sa.Column('dag_id', sa.String(length=250), nullable=False),
+            sa.Column('execution_date', sa.DateTime(), nullable=False),
+            sa.Column('start_date', sa.DateTime(), nullable=True),
+            sa.Column('end_date', sa.DateTime(), nullable=True),
+            sa.Column('duration', sa.Integer(), nullable=True),
+            sa.PrimaryKeyConstraint('id'),
+        )
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
index 633201b03b..cae1df00fd 100644
--- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
+++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -29,6 +29,8 @@
 import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
+from sqlalchemy.engine.reflection import Inspector
+
 revision = '86770d1215c0'
 down_revision = '27c6a30d7c24'
 branch_labels = None
@@ -50,10 +52,14 @@ def upgrade():
     if conn.dialect.name not in ('mssql'):
         columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id"))
 
-    table = op.create_table(
-        RESOURCE_TABLE,
-        *columns_and_constraints
-    )
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+    if RESOURCE_TABLE not in tables:
+        table = op.create_table(
+            RESOURCE_TABLE,
+            *columns_and_constraints
+        )
 
     op.bulk_insert(table, [
         {"worker_uuid": ""}
diff --git a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
index e14b4b8025..0d0fd09595 100644
--- a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
+++ b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
@@ -25,6 +25,7 @@
 """
 from alembic import op
 import sqlalchemy as sa
+from sqlalchemy.engine.reflection import Inspector
 
 # revision identifiers, used by Alembic.
 revision = 'f2ca10b85618'
@@ -34,12 +35,16 @@
 
 
 def upgrade():
-    op.create_table('dag_stats',
-                    sa.Column('dag_id', sa.String(length=250), nullable=False),
-                    sa.Column('state', sa.String(length=50), nullable=False),
-                    sa.Column('count', sa.Integer(), nullable=False, default=0),
-                    sa.Column('dirty', sa.Boolean(), nullable=False, default=False),
-                    sa.PrimaryKeyConstraint('dag_id', 'state'))
+    conn = op.get_bind()
+    inspector = Inspector.from_engine(conn)
+    tables = inspector.get_table_names()
+    if 'dag_stats' not in tables:
+        op.create_table('dag_stats',
+                        sa.Column('dag_id', sa.String(length=250), nullable=False),
+                        sa.Column('state', sa.String(length=50), nullable=False),
+                        sa.Column('count', sa.Integer(), nullable=False, default=0),
+                        sa.Column('dirty', sa.Boolean(), nullable=False, default=False),
+                        sa.PrimaryKeyConstraint('dag_id', 'state'))
 
 
 def downgrade():


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services