You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/19 19:42:06 UTC
[GitHub] Fokko closed pull request #3769: [AIRFLOW-2918] Fix a lot of Flake8
violations
Fokko closed pull request #3769: [AIRFLOW-2918] Fix a lot of Flake8 violations
URL: https://github.com/apache/incubator-airflow/pull/3769
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/__init__.py b/airflow/__init__.py
index bc6a7bbe19..d010fe4c74 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -32,8 +32,8 @@
import sys
-from airflow import configuration as conf
-from airflow import settings
+# flake8: noqa: F401
+from airflow import settings, configuration as conf
from airflow.models import DAG
from flask_admin import BaseView
from importlib import import_module
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 641b81e46d..08fa0d7929 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -19,18 +19,14 @@
import flask_login
# Need to expose these downstream
-# pylint: disable=unused-import
-from flask_login import (current_user,
- logout_user,
- login_required,
- login_user)
-# pylint: enable=unused-import
+# flake8: noqa: F401
+from flask_login import current_user, logout_user, login_required, login_user
from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
-from airflow import models, configuration, settings
+from airflow import models, configuration
from airflow.configuration import AirflowConfigException
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index e41934b926..08b29e383a 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -19,18 +19,14 @@
import flask_login
# Need to expose these downstream
-# pylint: disable=unused-import
-from flask_login import (current_user,
- logout_user,
- login_required,
- login_user)
-# pylint: enable=unused-import
+# flake8: noqa: F401
+from flask_login import current_user, logout_user, login_required, login_user
from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
-from airflow import models, configuration, settings
+from airflow import models, configuration
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index fdb6204967..4a019eb131 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -21,8 +21,7 @@
import flask_login
from flask_login import current_user
from flask import flash
-from wtforms import (
- Form, PasswordField, StringField)
+from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired
# pykerberos should be used as it verifies the KDC, the "kerberos" module does not do so
@@ -32,7 +31,6 @@
from flask import url_for, redirect
-from airflow import settings
from airflow import models
from airflow import configuration
from airflow.utils.db import provide_session
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 516e121c9b..a949e89a77 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -19,13 +19,12 @@
from future.utils import native
import flask_login
-from flask_login import login_required, current_user, logout_user
+from flask_login import login_required, current_user, logout_user # noqa: F401
from flask import flash
-from wtforms import (
- Form, PasswordField, StringField)
+from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired
-from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE, BASE
+from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE
import ssl
from flask import url_for, redirect
diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py
index 5dac0a44a9..def793c1be 100644
--- a/airflow/contrib/operators/mlengine_prediction_summary.py
+++ b/airflow/contrib/operators/mlengine_prediction_summary.py
@@ -1,3 +1,4 @@
+# flake8: noqa: F841
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
diff --git a/airflow/default_login.py b/airflow/default_login.py
index d44dbf39ea..bf87bbc47f 100644
--- a/airflow/default_login.py
+++ b/airflow/default_login.py
@@ -25,11 +25,11 @@
"""
import flask_login
-from flask_login import login_required, current_user, logout_user
+from flask_login import login_required, current_user, logout_user # noqa: F401
from flask import url_for, redirect
-from airflow import settings
+from airflow import settings # noqa: F401
from airflow import models
from airflow.utils.db import provide_session
@@ -64,9 +64,6 @@ def is_superuser(self):
"""Access all the things"""
return True
-# models.User = User # hack!
-# del User
-
@login_manager.user_loader
@provide_session
diff --git a/airflow/jobs.py b/airflow/jobs.py
index d51e7c2537..2662f073be 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -658,7 +658,7 @@ def manage_slas(self, dag, session=None):
slas = (
session
.query(SlaMiss)
- .filter(SlaMiss.notification_sent == False)
+ .filter(SlaMiss.notification_sent is False)
.filter(SlaMiss.dag_id == dag.dag_id)
.all()
)
@@ -708,16 +708,13 @@ def manage_slas(self, dag, session=None):
Blocking tasks:
<pre><code>{blocking_task_list}\n{bug}<code></pre>
""".format(bug=asciiart.bug, **locals())
- emails = []
- for t in dag.tasks:
- if t.email:
- if isinstance(t.email, basestring):
- l = [t.email]
- elif isinstance(t.email, (list, tuple)):
- l = t.email
- for email in l:
- if email not in emails:
- emails.append(email)
+ emails = set()
+ for task in dag.tasks:
+ if task.email:
+ if isinstance(task.email, basestring):
+ emails.add(task.email)
+ elif isinstance(task.email, (list, tuple)):
+ emails = emails | set(task.email)
if emails and len(slas):
try:
send_email(
@@ -818,7 +815,7 @@ def create_dag_run(self, dag, session=None):
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
- DagRun.external_trigger == False,
+ DagRun.external_trigger is False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX + '%')
))
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 97ebe4257f..4e1977635a 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -85,6 +85,7 @@ def run_migrations_online():
with context.begin_transaction():
context.run_migrations()
+
if context.is_offline_mode():
run_migrations_offline()
else:
diff --git a/airflow/migrations/versions/05f30312d566_merge_heads.py b/airflow/migrations/versions/05f30312d566_merge_heads.py
index 78d5652679..f869cb8c08 100644
--- a/airflow/migrations/versions/05f30312d566_merge_heads.py
+++ b/airflow/migrations/versions/05f30312d566_merge_heads.py
@@ -31,9 +31,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
pass
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
index 64ee41c44d..567172f9bf 100644
--- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -25,16 +25,16 @@
"""
+from alembic import op
+from sqlalchemy.dialects import mysql
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = '0e2a74e0fc9f'
down_revision = 'd2ae31099d61'
branch_labels = None
depends_on = None
-from alembic import op
-from sqlalchemy.dialects import mysql
-import sqlalchemy as sa
-
def upgrade():
conn = op.get_bind()
@@ -69,14 +69,16 @@ def upgrade():
op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
- op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
- op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
@@ -117,14 +119,16 @@ def upgrade():
op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
- op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
+ nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
- op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
+ nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
@@ -161,14 +165,16 @@ def downgrade():
op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
+ nullable=False)
op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+ op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
+ nullable=False)
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
index 6ee50aa94d..288a0b60aa 100644
--- a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -23,6 +23,7 @@
Create Date: 2017-01-25 11:43:51.635667
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '127d2bf2dfa7'
@@ -30,8 +31,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
def upgrade():
op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
diff --git a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
index 3db0d41190..fe84254c38 100644
--- a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
+++ b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
@@ -24,6 +24,9 @@
Create Date: 2015-08-18 18:57:51.927315
"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.engine.reflection import Inspector
# revision identifiers, used by Alembic.
revision = '1507a7289a2f'
@@ -31,10 +34,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.engine.reflection import Inspector
-
connectionhelper = sa.Table(
'connection',
sa.MetaData(),
diff --git a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
index aaf938a035..16ab349563 100644
--- a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
+++ b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
@@ -24,6 +24,8 @@
Create Date: 2016-02-02 17:20:55.692295
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1968acfc09e3'
@@ -31,12 +33,9 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
- op.add_column('variable', sa.Column('is_encrypted', sa.Boolean,default=False))
+ op.add_column('variable', sa.Column('is_encrypted', sa.Boolean, default=False))
def downgrade():
diff --git a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
index 79fcff4541..50d53652c4 100644
--- a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
+++ b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
@@ -25,28 +25,27 @@
"""
+from alembic import op
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = '1b38cef5b76e'
down_revision = '502898887f84'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_table('dag_run',
- sa.Column('id', sa.Integer(), nullable=False),
- sa.Column('dag_id', sa.String(length=250), nullable=True),
- sa.Column('execution_date', sa.DateTime(), nullable=True),
- sa.Column('state', sa.String(length=50), nullable=True),
- sa.Column('run_id', sa.String(length=250), nullable=True),
- sa.Column('external_trigger', sa.Boolean(), nullable=True),
- sa.PrimaryKeyConstraint('id'),
- sa.UniqueConstraint('dag_id', 'execution_date'),
- sa.UniqueConstraint('dag_id', 'run_id'),
- )
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('dag_id', sa.String(length=250), nullable=True),
+ sa.Column('execution_date', sa.DateTime(), nullable=True),
+ sa.Column('state', sa.String(length=50), nullable=True),
+ sa.Column('run_id', sa.String(length=250), nullable=True),
+ sa.Column('external_trigger', sa.Boolean(), nullable=True),
+ sa.PrimaryKeyConstraint('id'),
+ sa.UniqueConstraint('dag_id', 'execution_date'),
+ sa.UniqueConstraint('dag_id', 'run_id'))
def downgrade():
diff --git a/airflow/migrations/versions/211e584da130_add_ti_state_index.py b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
index afc600d58a..b17f390e0b 100644
--- a/airflow/migrations/versions/211e584da130_add_ti_state_index.py
+++ b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
@@ -24,6 +24,7 @@
Create Date: 2016-06-30 10:54:24.323588
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '211e584da130'
@@ -31,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_index('ti_state', 'task_instance', ['state'], unique=False)
diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
index 27a9f593b5..a757d27709 100644
--- a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
+++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
@@ -26,18 +26,16 @@
"""
+from alembic import op
+import sqlalchemy as sa
+import dill
+
# revision identifiers, used by Alembic.
revision = '27c6a30d7c24'
down_revision = '33ae817a1ff4'
branch_labels = None
depends_on = None
-
-from alembic import op
-import sqlalchemy as sa
-import dill
-
-
TASK_INSTANCE_TABLE = "task_instance"
NEW_COLUMN = "executor_config"
@@ -48,4 +46,3 @@ def upgrade():
def downgrade():
op.drop_column(TASK_INSTANCE_TABLE, NEW_COLUMN)
-
diff --git a/airflow/migrations/versions/2e541a1dcfed_task_duration.py b/airflow/migrations/versions/2e541a1dcfed_task_duration.py
index 6b24ef66e4..595a5774a6 100644
--- a/airflow/migrations/versions/2e541a1dcfed_task_duration.py
+++ b/airflow/migrations/versions/2e541a1dcfed_task_duration.py
@@ -25,16 +25,16 @@
"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import mysql
+
# revision identifiers, used by Alembic.
revision = '2e541a1dcfed'
down_revision = '1b38cef5b76e'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-
def upgrade():
# use batch_alter_table to support SQLite workaround
diff --git a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
index 75db27cdb3..fc8a1aab20 100644
--- a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
+++ b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
@@ -24,6 +24,7 @@
Create Date: 2016-04-02 19:28:15.211915
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '2e82aab8ef20'
@@ -31,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.rename_table('user', 'users')
diff --git a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
index 4f1364b971..473f76778b 100644
--- a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
+++ b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
@@ -17,13 +17,17 @@
# specific language governing permissions and limitations
# under the License.
-"""More logging into task_isntance
+"""More logging into task_instance
Revision ID: 338e90f54d61
Revises: 13eb55f81627
Create Date: 2015-08-25 06:09:20.460147
"""
+# flake8: noqa: E266
+
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '338e90f54d61'
@@ -31,9 +35,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
### commands auto generated by Alembic - please adjust! ###
diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index c489c05f7e..925bf26df0 100644
--- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -25,6 +25,8 @@
Create Date: 2017-09-11 15:26:47.598494
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '33ae817a1ff4'
@@ -32,10 +34,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
-
RESOURCE_TABLE = "kube_resource_version"
@@ -53,4 +51,3 @@ def upgrade():
def downgrade():
op.drop_table(RESOURCE_TABLE)
-
diff --git a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
index 02ea51501c..3da4d5f543 100644
--- a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
+++ b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
@@ -24,6 +24,8 @@
Create Date: 2015-10-29 08:36:31.726728
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '40e67319e3a9'
@@ -31,9 +33,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True))
diff --git a/airflow/migrations/versions/4446e08588_dagrun_start_end.py b/airflow/migrations/versions/4446e08588_dagrun_start_end.py
index 101f2ad905..29932c9206 100644
--- a/airflow/migrations/versions/4446e08588_dagrun_start_end.py
+++ b/airflow/migrations/versions/4446e08588_dagrun_start_end.py
@@ -25,15 +25,15 @@
"""
+from alembic import op
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = '4446e08588'
down_revision = '561833c1c74b'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.add_column('dag_run', sa.Column('end_date', sa.DateTime(), nullable=True))
diff --git a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
index c3898e9596..655ff61042 100644
--- a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
+++ b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
@@ -24,93 +24,147 @@
"""
+from alembic import op
+from sqlalchemy.dialects import mysql
+from alembic import context
+
# revision identifiers, used by Alembic.
revision = '4addfa1236f1'
down_revision = 'f2ca10b85618'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-from alembic import context
-
def upgrade():
if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
- op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
- op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
- op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
-
- op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
- op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_scheduler_run',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_pickled',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag', column_name='last_expired',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='dag_run', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='start_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='dag_run', column_name='end_date',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='import_error', column_name='timestamp',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='job', column_name='start_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='job', column_name='end_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='job', column_name='latest_heartbeat',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='known_event', column_name='start_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='known_event', column_name='end_date',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='log', column_name='dttm',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='log', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6),
+ nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='timestamp',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='task_fail', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='start_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_fail', column_name='end_date',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='task_instance', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6),
+ nullable=False)
+ op.alter_column(table_name='task_instance', column_name='start_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='end_date',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='task_instance', column_name='queued_dttm',
+ type_=mysql.DATETIME(fsp=6))
+
+ op.alter_column(table_name='xcom', column_name='timestamp',
+ type_=mysql.DATETIME(fsp=6))
+ op.alter_column(table_name='xcom', column_name='execution_date',
+ type_=mysql.DATETIME(fsp=6))
def downgrade():
if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
- op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME())
- op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME())
- op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME())
-
- op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME())
-
- op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME())
- op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME())
- op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME())
-
- op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME())
-
- op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME())
- op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME())
- op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME())
-
- op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME())
- op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME())
-
- op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME())
- op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME())
-
- op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(), nullable=False)
- op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME())
-
- op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME())
- op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME())
- op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME())
-
- op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(), nullable=False)
- op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME())
- op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME())
- op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME())
-
- op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME())
- op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME())
+ op.alter_column(table_name='dag', column_name='last_scheduler_run',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='dag', column_name='last_pickled',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='dag', column_name='last_expired',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='dag_pickle', column_name='created_dttm',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='dag_run', column_name='execution_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='dag_run', column_name='start_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='dag_run', column_name='end_date',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='import_error', column_name='timestamp',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='job', column_name='start_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='job', column_name='end_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='job', column_name='latest_heartbeat',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='known_event', column_name='start_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='known_event', column_name='end_date',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='log', column_name='dttm',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='log', column_name='execution_date',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='sla_miss', column_name='execution_date',
+ type_=mysql.DATETIME(), nullable=False)
+ op.alter_column(table_name='sla_miss', column_name='timestamp',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='task_fail', column_name='execution_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='task_fail', column_name='start_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='task_fail', column_name='end_date',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='task_instance', column_name='execution_date',
+ type_=mysql.DATETIME(),
+ nullable=False)
+ op.alter_column(table_name='task_instance', column_name='start_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='task_instance', column_name='end_date',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='task_instance', column_name='queued_dttm',
+ type_=mysql.DATETIME())
+
+ op.alter_column(table_name='xcom', column_name='timestamp',
+ type_=mysql.DATETIME())
+ op.alter_column(table_name='xcom', column_name='execution_date',
+ type_=mysql.DATETIME())
diff --git a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
index 333b18e8d2..632720a4e2 100644
--- a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
+++ b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
@@ -24,6 +24,8 @@
Create Date: 2015-11-03 22:50:49.794097
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '502898887f84'
@@ -31,9 +33,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.add_column('log', sa.Column('extra', sa.Text(), nullable=True))
diff --git a/airflow/migrations/versions/52d714495f0_job_id_indices.py b/airflow/migrations/versions/52d714495f0_job_id_indices.py
index 13f561cffb..94374cb68a 100644
--- a/airflow/migrations/versions/52d714495f0_job_id_indices.py
+++ b/airflow/migrations/versions/52d714495f0_job_id_indices.py
@@ -24,6 +24,7 @@
Create Date: 2015-10-20 03:17:01.962542
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '52d714495f0'
@@ -31,12 +32,10 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
- op.create_index('idx_job_state_heartbeat', 'job', ['state', 'latest_heartbeat'], unique=False)
+ op.create_index('idx_job_state_heartbeat', 'job',
+ ['state', 'latest_heartbeat'], unique=False)
def downgrade():
diff --git a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
index d92db9f891..a26a105ac8 100644
--- a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
+++ b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
@@ -24,6 +24,8 @@
Create Date: 2015-11-30 06:51:25.872557
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '561833c1c74b'
@@ -31,9 +33,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.add_column('user', sa.Column('password', sa.String(255)))
diff --git a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
index e23a0a8b2f..77a35db48f 100644
--- a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
+++ b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
@@ -24,15 +24,15 @@
"""
+from alembic import op
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = '5e7d17757c7a'
down_revision = '8504051e801b'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.add_column('task_instance', sa.Column('pid', sa.Integer))
diff --git a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
index 6dda5df43b..2def57e904 100644
--- a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
+++ b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
@@ -23,6 +23,8 @@
Create Date: 2016-08-03 14:02:59.203021
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '64de9cddf6c9'
@@ -30,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_table(
@@ -47,5 +46,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
)
+
def downgrade():
op.drop_table('task_fail')
diff --git a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
index d6a4514ae2..fdcbc59df8 100644
--- a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
+++ b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
@@ -25,18 +25,18 @@
"""
+from alembic import op
+
# revision identifiers, used by Alembic.
revision = '8504051e801b'
down_revision = '4addfa1236f1'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
- op.create_index('idx_xcom_dag_task_date', 'xcom', ['dag_id', 'task_id', 'execution_date'], unique=False)
+ op.create_index('idx_xcom_dag_task_date', 'xcom',
+ ['dag_id', 'task_id', 'execution_date'], unique=False)
def downgrade():
diff --git a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
index 5b11dc7860..52a817081b 100644
--- a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
+++ b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
@@ -25,15 +25,15 @@
"""
+from alembic import op
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = '856955da8476'
down_revision = 'f23433877c24'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
conn = op.get_bind()
@@ -45,41 +45,39 @@ def upgrade():
#
# Use batch_alter_table to support SQLite workaround.
chart_table = sa.Table('chart',
- sa.MetaData(),
- sa.Column('id', sa.Integer(), nullable=False),
- sa.Column('label', sa.String(length=200), nullable=True),
- sa.Column('conn_id', sa.String(length=250), nullable=False),
- sa.Column('user_id', sa.Integer(), nullable=True),
- sa.Column('chart_type', sa.String(length=100), nullable=True),
- sa.Column('sql_layout', sa.String(length=50), nullable=True),
- sa.Column('sql', sa.Text(), nullable=True),
- sa.Column('y_log_scale', sa.Boolean(), nullable=True),
- sa.Column('show_datatable', sa.Boolean(), nullable=True),
- sa.Column('show_sql', sa.Boolean(), nullable=True),
- sa.Column('height', sa.Integer(), nullable=True),
- sa.Column('default_params', sa.String(length=5000), nullable=True),
- sa.Column('x_is_date', sa.Boolean(), nullable=True),
- sa.Column('iteration_no', sa.Integer(), nullable=True),
- sa.Column('last_modified', sa.DateTime(), nullable=True),
- sa.PrimaryKeyConstraint('id')
- )
+ sa.MetaData(),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('label', sa.String(length=200), nullable=True),
+ sa.Column('conn_id', sa.String(length=250), nullable=False),
+ sa.Column('user_id', sa.Integer(), nullable=True),
+ sa.Column('chart_type', sa.String(length=100), nullable=True),
+ sa.Column('sql_layout', sa.String(length=50), nullable=True),
+ sa.Column('sql', sa.Text(), nullable=True),
+ sa.Column('y_log_scale', sa.Boolean(), nullable=True),
+ sa.Column('show_datatable', sa.Boolean(), nullable=True),
+ sa.Column('show_sql', sa.Boolean(), nullable=True),
+ sa.Column('height', sa.Integer(), nullable=True),
+ sa.Column('default_params', sa.String(length=5000), nullable=True),
+ sa.Column('x_is_date', sa.Boolean(), nullable=True),
+ sa.Column('iteration_no', sa.Integer(), nullable=True),
+ sa.Column('last_modified', sa.DateTime(), nullable=True),
+ sa.PrimaryKeyConstraint('id'))
with op.batch_alter_table('chart', copy_from=chart_table) as batch_op:
batch_op.create_foreign_key('chart_user_id_fkey', 'users',
['user_id'], ['id'])
known_event_table = sa.Table('known_event',
- sa.MetaData(),
- sa.Column('id', sa.Integer(), nullable=False),
- sa.Column('label', sa.String(length=200), nullable=True),
- sa.Column('start_date', sa.DateTime(), nullable=True),
- sa.Column('end_date', sa.DateTime(), nullable=True),
- sa.Column('user_id', sa.Integer(), nullable=True),
- sa.Column('known_event_type_id', sa.Integer(), nullable=True),
- sa.Column('description', sa.Text(), nullable=True),
- sa.ForeignKeyConstraint(['known_event_type_id'],
- ['known_event_type.id'], ),
- sa.PrimaryKeyConstraint('id')
- )
+ sa.MetaData(),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('label', sa.String(length=200), nullable=True),
+ sa.Column('start_date', sa.DateTime(), nullable=True),
+ sa.Column('end_date', sa.DateTime(), nullable=True),
+ sa.Column('user_id', sa.Integer(), nullable=True),
+ sa.Column('known_event_type_id', sa.Integer(), nullable=True),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.ForeignKeyConstraint(['known_event_type_id'],
+ ['known_event_type.id'], ),
+ sa.PrimaryKeyConstraint('id'))
with op.batch_alter_table('chart', copy_from=known_event_table) as batch_op:
batch_op.create_foreign_key('known_event_user_id_fkey', 'users',
['user_id'], ['id'])
diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
index 5c921c6a98..ace7845965 100644
--- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
+++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -25,6 +25,8 @@
Create Date: 2018-04-03 15:31:20.814328
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '86770d1215c0'
@@ -32,10 +34,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
-
RESOURCE_TABLE = "kube_worker_uuid"
diff --git a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
index b821cacedc..6ff41baa28 100644
--- a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
+++ b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
@@ -24,6 +24,7 @@
Create Date: 2017-08-15 15:12:13.845074
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '947454bf1dff'
@@ -31,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False)
diff --git a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
index da69846233..6b21c3474a 100644
--- a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
+++ b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
@@ -24,6 +24,7 @@
Create Date: 2018-06-17 21:40:01.963540
"""
+from alembic import op
# revision identifiers, used by Alembic.
revision = '9635ae0956e7'
@@ -31,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_index('idx_task_fail_dag_task_date', 'task_fail', ['dag_id', 'task_id', 'execution_date'], unique=False)
@@ -41,4 +39,3 @@ def upgrade():
def downgrade():
op.drop_index('idx_task_fail_dag_task_date', table_name='task_fail')
-
diff --git a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
index a19eb58eb5..503cd0b6f0 100644
--- a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
+++ b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
@@ -25,18 +25,19 @@
"""
+from alembic import op
+import sqlalchemy as sa
+
# revision identifiers, used by Alembic.
revision = 'bba5a7cfc896'
down_revision = 'bbc73705a13e'
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
- op.add_column('connection', sa.Column('is_extra_encrypted', sa.Boolean,default=False))
+ op.add_column('connection',
+ sa.Column('is_extra_encrypted', sa.Boolean, default=False))
def downgrade():
diff --git a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
index 04cdc22d6c..9855a6d4da 100644
--- a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
+++ b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
@@ -24,6 +24,8 @@
Create Date: 2016-01-14 18:05:54.871682
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'bbc73705a13e'
@@ -31,12 +33,9 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
- op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean,default=False))
+ op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean, default=False))
def downgrade():
diff --git a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
index a02eea5519..a1a5270c7b 100644
--- a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
+++ b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
@@ -23,6 +23,9 @@
Create Date: 2017-08-14 16:06:31.568971
"""
+from alembic import op
+import dill
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'bdaa763e6c56'
@@ -30,15 +33,10 @@
branch_labels = None
depends_on = None
-from alembic import op
-import dill
-import sqlalchemy as sa
-
def upgrade():
# There can be data truncation here as LargeBinary can be smaller than the pickle
# type.
-
# use batch_alter_table to support SQLite workaround
with op.batch_alter_table("xcom") as batch_op:
batch_op.alter_column('value', type_=sa.LargeBinary())
diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
index 68228f7219..6155a40c81 100644
--- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
+++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -25,23 +25,22 @@
"""
-# revision identifiers, used by Alembic.
-revision = 'cc1e65623dc7'
-down_revision = '127d2bf2dfa7'
-branch_labels = None
-depends_on = None
-
from alembic import op
import sqlalchemy as sa
from airflow import settings
from airflow.models import DagBag
from airflow.utils.sqlalchemy import UtcDateTime
-from sqlalchemy import (
- Column, Integer, String)
+from sqlalchemy import Column, Integer, String
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.ext.declarative import declarative_base
+# revision identifiers, used by Alembic.
+revision = 'cc1e65623dc7'
+down_revision = '127d2bf2dfa7'
+branch_labels = None
+depends_on = None
+
Base = declarative_base()
BATCH_SIZE = 5000
ID_LEN = 250
@@ -58,8 +57,7 @@ class TaskInstance(Base):
def upgrade():
- op.add_column('task_instance', sa.Column('max_tries', sa.Integer,
- server_default="-1"))
+ op.add_column('task_instance', sa.Column('max_tries', sa.Integer, server_default="-1"))
# Check if table task_instance exist before data migration. This check is
# needed for database that does not create table until migration finishes.
# Checking task_instance table exists prevent the error of querying
@@ -129,7 +127,7 @@ def downgrade():
# max number of self retry (task.retries) minus number of
# times left for task instance to try the task.
ti.try_number = max(0, task.retries - (ti.max_tries -
- ti.try_number))
+ ti.try_number))
ti.max_tries = -1
session.merge(ti)
session.commit()
diff --git a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
index 5ecb0d5c72..db5afaf023 100644
--- a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
+++ b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
@@ -23,6 +23,9 @@
Create Date: 2017-08-18 17:07:16.686130
"""
+from alembic import op
+from sqlalchemy.dialects import mysql
+from alembic import context
# revision identifiers, used by Alembic.
revision = 'd2ae31099d61'
@@ -30,11 +33,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-from alembic import context
-
def upgrade():
if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
diff --git a/airflow/migrations/versions/e3a246e0dc1_current_schema.py b/airflow/migrations/versions/e3a246e0dc1_current_schema.py
index cfa4147dd9..cbf9897645 100644
--- a/airflow/migrations/versions/e3a246e0dc1_current_schema.py
+++ b/airflow/migrations/versions/e3a246e0dc1_current_schema.py
@@ -25,17 +25,17 @@
"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy import func
+from sqlalchemy.engine.reflection import Inspector
+
# revision identifiers, used by Alembic.
revision = 'e3a246e0dc1'
down_revision = None
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy import func
-from sqlalchemy.engine.reflection import Inspector
-
def upgrade():
conn = op.get_bind()
diff --git a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
index 44edeef069..3e643f629d 100644
--- a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
+++ b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
@@ -24,6 +24,8 @@
Create Date: 2018-06-17 10:16:31.412131
"""
+from alembic import op
+from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'f23433877c24'
@@ -31,9 +33,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
def upgrade():
conn = op.get_bind()
@@ -51,4 +50,3 @@ def downgrade():
op.alter_column('xcom', 'timestamp', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
op.alter_column('xcom', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
op.alter_column('task_fail', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
-
diff --git a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
index 3e3d40abc4..e14b4b8025 100644
--- a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
+++ b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
@@ -23,6 +23,8 @@
Create Date: 2016-07-20 15:08:28.247537
"""
+from alembic import op
+import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f2ca10b85618'
@@ -30,9 +32,6 @@
branch_labels = None
depends_on = None
-from alembic import op
-import sqlalchemy as sa
-
def upgrade():
op.create_table('dag_stats',
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index c8cc981e93..b152198af1 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -19,7 +19,6 @@
import sys
import os
-from airflow.models import BaseOperator
# ------------------------------------------------------------------------
#
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 216b2ed626..3393de0311 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -24,12 +24,11 @@
from airflow.utils import timezone
from datetime import datetime, timedelta
-from dateutil.relativedelta import relativedelta # for doctest
+from dateutil.relativedelta import relativedelta # flake8: noqa: F401 for doctest
import six
from croniter import croniter
-
cron_presets = {
'@hourly': '0 * * * *',
'@daily': '0 0 * * *',
@@ -39,11 +38,7 @@
}
-def date_range(
- start_date,
- end_date=None,
- num=None,
- delta=None):
+def date_range(start_date, end_date=None, num=None, delta=None):
"""
Get a set of dates as a list based on a start, end and delta, delta
can be something that can be added to ``datetime.datetime``
@@ -181,8 +176,8 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
# Check if start_date + (lower + 1)*delta or
# start_date + lower*delta is closer to dt and return the solution
if (
- (start_date + (lower + 1) * delta) - dt <=
- dt - (start_date + lower * delta)):
+ (start_date + (lower + 1) * delta) - dt <=
+ dt - (start_date + lower * delta)):
return start_date + (lower + 1) * delta
else:
return start_date + lower * delta
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index f1f0ea9a42..4bc8e0debd 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -100,5 +100,6 @@ def wrapper(*args, **kwargs):
return wrapper
if 'BUILDING_AIRFLOW_DOCS' in os.environ:
+ # flake8: noqa: F811
# Monkey patch hook to get good function headers while building docs
apply_defaults = lambda x: x
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 08347a515c..f4038715ac 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -22,7 +22,6 @@
from __future__ import print_function
from __future__ import unicode_literals
-from builtins import str
from past.builtins import basestring
import importlib
@@ -62,13 +61,13 @@ def send_email_smtp(to, subject, html_content, files=None,
>>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
"""
- SMTP_MAIL_FROM = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
+ smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
to = get_email_address_list(to)
msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
- msg['From'] = SMTP_MAIL_FROM
+ msg['From'] = smtp_mail_from
msg['To'] = ", ".join(to)
recipients = to
if cc:
@@ -96,7 +95,7 @@ def send_email_smtp(to, subject, html_content, files=None,
part['Content-ID'] = '<%s>' % basename
msg.attach(part)
- send_MIME_email(SMTP_MAIL_FROM, recipients, msg, dryrun)
+ send_MIME_email(smtp_mail_from, recipients, msg, dryrun)
def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 45d0217e23..d6b1d93b38 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -32,7 +32,6 @@
import os
import re
import signal
-import subprocess
import sys
import warnings
@@ -226,6 +225,7 @@ def reap_process_group(pid, log, sig=signal.SIGTERM,
:param sig: signal type
:param timeout: how much time a process has to terminate
"""
+
def on_terminate(p):
log.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py
index d74aabfbae..16372c0600 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
-# Using `from elasticsearch import *` would break elasticseach mocking used in unit test.
+# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch_dsl import Search
diff --git a/airflow/www/app.py b/airflow/www/app.py
index f7976b0dd5..0e6a2a0c23 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -18,7 +18,6 @@
# under the License.
#
import six
-import os
from flask import Flask
from flask_admin import Admin, base
@@ -64,8 +63,8 @@ def create_app(config=None, testing=False):
api.load_auth()
api.api_auth.init_app(app)
- cache = Cache(
- app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
+ # flake8: noqa: F841
+ cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
app.register_blueprint(routes)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 0c4f4b05d6..9ce114d5ed 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -17,11 +17,11 @@
# specific language governing permissions and limitations
# under the License.
#
+# flake8: noqa: E402
import inspect
from future import standard_library
standard_library.install_aliases()
-from builtins import str
-from builtins import object
+from builtins import str, object
from cgi import escape
from io import BytesIO as IO
@@ -29,13 +29,13 @@
import gzip
import json
import time
+import wtforms
+from wtforms.compat import text_type
from flask import after_this_request, request, Response
-from flask_admin.contrib.sqla.filters import FilterConverter
from flask_admin.model import filters
+from flask_admin.contrib.sqla.filters import FilterConverter
from flask_login import current_user
-import wtforms
-from wtforms.compat import text_type
from airflow import configuration, models, settings
from airflow.utils.db import create_session
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0c0dcff801..63354a22e7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -22,7 +22,6 @@
import codecs
import copy
import datetime as dt
-import inspect
import itertools
import json
import logging
@@ -365,6 +364,7 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
'dr_state': dr_state,
}
+
class Airflow(BaseView):
def is_visible(self):
return False
@@ -481,8 +481,6 @@ def chart_data(self):
else:
if chart.sql_layout == 'series':
# User provides columns (series, x, y)
- xaxis_label = df.columns[1]
- yaxis_label = df.columns[2]
df[df.columns[2]] = df[df.columns[2]].astype(np.float)
df = df.pivot_table(
index=df.columns[1],
@@ -490,8 +488,6 @@ def chart_data(self):
values=df.columns[2], aggfunc=np.sum)
else:
# User provides columns (x, y, metric1, metric2, ...)
- xaxis_label = df.columns[0]
- yaxis_label = 'y'
df.index = df[df.columns[0]]
df = df.sort(df.columns[0])
del df[df.columns[0]]
@@ -599,8 +595,8 @@ def task_stats(self, session=None):
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING)
- .filter(Dag.is_active == True)
- .filter(Dag.is_subdag == False)
+ .filter(Dag.is_active is True)
+ .filter(Dag.is_subdag is False)
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
)
@@ -608,8 +604,8 @@ def task_stats(self, session=None):
session.query(DagRun.dag_id, DagRun.execution_date)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING)
- .filter(Dag.is_active == True)
- .filter(Dag.is_subdag == False)
+ .filter(Dag.is_active is True)
+ .filter(Dag.is_subdag is False)
.subquery('running_dag_run')
)
@@ -883,7 +879,7 @@ def task(self):
for attr_name in dir(ti):
if not attr_name.startswith('_'):
attr = getattr(ti, attr_name)
- if type(attr) != type(self.task):
+ if type(attr) != type(self.task): # noqa: E721
ti_attrs.append((attr_name, str(attr)))
task_attrs = []
@@ -891,7 +887,7 @@ def task(self):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
if type(attr) != type(self.task) and \
- attr_name not in attr_renderer:
+ attr_name not in attr_renderer: # noqa: E721
task_attrs.append((attr_name, str(attr)))
# Color coding the special attributes that are code
@@ -1166,7 +1162,6 @@ def clear(self):
@wwwutils.notify_owner
def dagrun_clear(self):
dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
origin = request.args.get('origin')
execution_date = request.args.get('execution_date')
confirmed = request.args.get('confirmed') == "true"
diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py
index b319426aa9..321185ee9b 100644
--- a/airflow/www_rbac/app.py
+++ b/airflow/www_rbac/app.py
@@ -19,7 +19,6 @@
#
import socket
import six
-import os
from flask import Flask
from flask_appbuilder import AppBuilder, SQLA
@@ -59,7 +58,8 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
api.load_auth()
api.api_auth.init_app(app)
- cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'}) # noqa
+ # flake8: noqa: F841
+ cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
from airflow.www_rbac.blueprints import routes
app.register_blueprint(routes)
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 629f488fc7..7a2a2d3124 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -35,7 +35,7 @@
import pendulum
import sqlalchemy as sqla
from flask import (
- g, redirect, request, Markup, Response, render_template,
+ redirect, request, Markup, Response, render_template,
make_response, flash, jsonify)
from flask._compat import PY2
from flask_appbuilder import BaseView, ModelView, expose, has_access
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services