You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/20 22:44:39 UTC

[GitHub] kaxil closed pull request #3772: [AIRFLOW-2918] Fix Flake8 violations

kaxil closed pull request #3772: [AIRFLOW-2918] Fix Flake8 violations
URL: https://github.com/apache/incubator-airflow/pull/3772
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/__init__.py b/airflow/__init__.py
index bc6a7bbe19..d010fe4c74 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -32,8 +32,8 @@
 
 import sys
 
-from airflow import configuration as conf
-from airflow import settings
+# flake8: noqa: F401
+from airflow import settings, configuration as conf
 from airflow.models import DAG
 from flask_admin import BaseView
 from importlib import import_module
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 641b81e46d..08fa0d7929 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -19,18 +19,14 @@
 import flask_login
 
 # Need to expose these downstream
-# pylint: disable=unused-import
-from flask_login import (current_user,
-                         logout_user,
-                         login_required,
-                         login_user)
-# pylint: enable=unused-import
+# flake8: noqa: F401
+from flask_login import current_user, logout_user, login_required, login_user
 
 from flask import url_for, redirect, request
 
 from flask_oauthlib.client import OAuth
 
-from airflow import models, configuration, settings
+from airflow import models, configuration
 from airflow.configuration import AirflowConfigException
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index e41934b926..08b29e383a 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -19,18 +19,14 @@
 import flask_login
 
 # Need to expose these downstream
-# pylint: disable=unused-import
-from flask_login import (current_user,
-                         logout_user,
-                         login_required,
-                         login_user)
-# pylint: enable=unused-import
+# flake8: noqa: F401
+from flask_login import current_user, logout_user, login_required, login_user
 
 from flask import url_for, redirect, request
 
 from flask_oauthlib.client import OAuth
 
-from airflow import models, configuration, settings
+from airflow import models, configuration
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
 
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index fdb6204967..4a019eb131 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -21,8 +21,7 @@
 import flask_login
 from flask_login import current_user
 from flask import flash
-from wtforms import (
-    Form, PasswordField, StringField)
+from wtforms import Form, PasswordField, StringField
 from wtforms.validators import InputRequired
 
 # pykerberos should be used as it verifies the KDC, the "kerberos" module does not do so
@@ -32,7 +31,6 @@
 
 from flask import url_for, redirect
 
-from airflow import settings
 from airflow import models
 from airflow import configuration
 from airflow.utils.db import provide_session
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 516e121c9b..a949e89a77 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -19,13 +19,12 @@
 from future.utils import native
 
 import flask_login
-from flask_login import login_required, current_user, logout_user
+from flask_login import login_required, current_user, logout_user  # noqa: F401
 from flask import flash
-from wtforms import (
-    Form, PasswordField, StringField)
+from wtforms import Form, PasswordField, StringField
 from wtforms.validators import InputRequired
 
-from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE, BASE
+from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE
 import ssl
 
 from flask import url_for, redirect
diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py
index 5dac0a44a9..def793c1be 100644
--- a/airflow/contrib/operators/mlengine_prediction_summary.py
+++ b/airflow/contrib/operators/mlengine_prediction_summary.py
@@ -1,3 +1,4 @@
+# flake8: noqa: F841
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
diff --git a/airflow/default_login.py b/airflow/default_login.py
index d44dbf39ea..bf87bbc47f 100644
--- a/airflow/default_login.py
+++ b/airflow/default_login.py
@@ -25,11 +25,11 @@
 """
 
 import flask_login
-from flask_login import login_required, current_user, logout_user
+from flask_login import login_required, current_user, logout_user  # noqa: F401
 
 from flask import url_for, redirect
 
-from airflow import settings
+from airflow import settings  # noqa: F401
 from airflow import models
 from airflow.utils.db import provide_session
 
@@ -64,9 +64,6 @@ def is_superuser(self):
         """Access all the things"""
         return True
 
-# models.User = User  # hack!
-# del User
-
 
 @login_manager.user_loader
 @provide_session
diff --git a/airflow/jobs.py b/airflow/jobs.py
index d51e7c2537..5bcd3e8407 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -658,7 +658,7 @@ def manage_slas(self, dag, session=None):
         slas = (
             session
             .query(SlaMiss)
-            .filter(SlaMiss.notification_sent == False)
+            .filter(SlaMiss.notification_sent == False)  # noqa: E712
             .filter(SlaMiss.dag_id == dag.dag_id)
             .all()
         )
@@ -708,16 +708,13 @@ def manage_slas(self, dag, session=None):
             Blocking tasks:
             <pre><code>{blocking_task_list}\n{bug}<code></pre>
             """.format(bug=asciiart.bug, **locals())
-            emails = []
-            for t in dag.tasks:
-                if t.email:
-                    if isinstance(t.email, basestring):
-                        l = [t.email]
-                    elif isinstance(t.email, (list, tuple)):
-                        l = t.email
-                    for email in l:
-                        if email not in emails:
-                            emails.append(email)
+            emails = set()
+            for task in dag.tasks:
+                if task.email:
+                    if isinstance(task.email, basestring):
+                        emails.add(task.email)
+                    elif isinstance(task.email, (list, tuple)):
+                        emails |= set(task.email)
             if emails and len(slas):
                 try:
                     send_email(
@@ -818,7 +815,7 @@ def create_dag_run(self, dag, session=None):
                 session.query(func.max(DagRun.execution_date))
                 .filter_by(dag_id=dag.dag_id)
                 .filter(or_(
-                    DagRun.external_trigger == False,
+                    DagRun.external_trigger == False,  # noqa: E712
                     # add % as a wildcard for the like query
                     DagRun.run_id.like(DagRun.ID_PREFIX + '%')
                 ))
@@ -1089,14 +1086,16 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
                 DR,
                 and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
             )
-            .filter(or_(DR.run_id == None,  # noqa E711
+            .filter(or_(DR.run_id == None,  # noqa: E711
                     not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
             .outerjoin(DM, DM.dag_id == TI.dag_id)
-            .filter(or_(DM.dag_id == None,  # noqa E711
+            .filter(or_(DM.dag_id == None,  # noqa: E711
                     not_(DM.is_paused)))
         )
         if None in states:
-            ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states))) # noqa E711
+            ti_query = ti_query.filter(
+                or_(TI.state == None, TI.state.in_(states))  # noqa: E711
+            )
         else:
             ti_query = ti_query.filter(TI.state.in_(states))
 
@@ -1184,8 +1183,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
                 )
                 if current_task_concurrency >= task_concurrency_limit:
                     self.log.info(
-                        "Not executing %s since the number of tasks running or queued from DAG %s"
-                        " is >= to the DAG's task concurrency limit of %s",
+                        "Not executing %s since the number of tasks running or queued "
+                        "from DAG %s is >= to the DAG's task concurrency limit of %s",
                         task_instance, dag_id, task_concurrency_limit
                     )
                     continue
@@ -1261,7 +1260,7 @@ def _change_state_for_executable_task_instances(self, task_instances,
 
         if None in acceptable_states:
             ti_query = ti_query.filter(
-                or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa E711
+                or_(TI.state == None, TI.state.in_(acceptable_states))  # noqa: E711
             )
         else:
             ti_query = ti_query.filter(TI.state.in_(acceptable_states))
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 97ebe4257f..4e1977635a 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -85,6 +85,7 @@ def run_migrations_online():
         with context.begin_transaction():
             context.run_migrations()
 
+
 if context.is_offline_mode():
     run_migrations_offline()
 else:
diff --git a/airflow/migrations/versions/05f30312d566_merge_heads.py b/airflow/migrations/versions/05f30312d566_merge_heads.py
index 78d5652679..f869cb8c08 100644
--- a/airflow/migrations/versions/05f30312d566_merge_heads.py
+++ b/airflow/migrations/versions/05f30312d566_merge_heads.py
@@ -31,9 +31,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     pass
diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
index 64ee41c44d..567172f9bf 100644
--- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -25,16 +25,16 @@
 
 """
 
+from alembic import op
+from sqlalchemy.dialects import mysql
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = '0e2a74e0fc9f'
 down_revision = 'd2ae31099d61'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-from sqlalchemy.dialects import mysql
-import sqlalchemy as sa
-
 
 def upgrade():
     conn = op.get_bind()
@@ -69,14 +69,16 @@ def upgrade():
         op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
 
-        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
+                        nullable=False)
         op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
 
         op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
 
-        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
+                        nullable=False)
         op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
@@ -117,14 +119,16 @@ def upgrade():
         op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True))
         op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
 
-        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
+                        nullable=False)
         op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True))
 
         op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True))
         op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
         op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
 
-        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True),
+                        nullable=False)
         op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True))
         op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True))
         op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True))
@@ -161,14 +165,16 @@ def downgrade():
         op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
 
-        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
+                        nullable=False)
         op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6))
 
         op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
 
-        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
+        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6),
+                        nullable=False)
         op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
index 6ee50aa94d..288a0b60aa 100644
--- a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -23,6 +23,7 @@
 Create Date: 2017-01-25 11:43:51.635667
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '127d2bf2dfa7'
@@ -30,8 +31,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
 
 def upgrade():
     op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
diff --git a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
index 3db0d41190..fe84254c38 100644
--- a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
+++ b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py
@@ -24,6 +24,9 @@
 Create Date: 2015-08-18 18:57:51.927315
 
 """
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.engine.reflection import Inspector
 
 # revision identifiers, used by Alembic.
 revision = '1507a7289a2f'
@@ -31,10 +34,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.engine.reflection import Inspector
-
 connectionhelper = sa.Table(
     'connection',
     sa.MetaData(),
diff --git a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
index aaf938a035..16ab349563 100644
--- a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
+++ b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py
@@ -24,6 +24,8 @@
 Create Date: 2016-02-02 17:20:55.692295
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '1968acfc09e3'
@@ -31,12 +33,9 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
-    op.add_column('variable', sa.Column('is_encrypted', sa.Boolean,default=False))
+    op.add_column('variable', sa.Column('is_encrypted', sa.Boolean, default=False))
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
index 79fcff4541..50d53652c4 100644
--- a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
+++ b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
@@ -25,28 +25,27 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = '1b38cef5b76e'
 down_revision = '502898887f84'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_table('dag_run',
-        sa.Column('id', sa.Integer(), nullable=False),
-        sa.Column('dag_id', sa.String(length=250), nullable=True),
-        sa.Column('execution_date', sa.DateTime(), nullable=True),
-        sa.Column('state', sa.String(length=50), nullable=True),
-        sa.Column('run_id', sa.String(length=250), nullable=True),
-        sa.Column('external_trigger', sa.Boolean(), nullable=True),
-        sa.PrimaryKeyConstraint('id'),
-        sa.UniqueConstraint('dag_id', 'execution_date'),
-        sa.UniqueConstraint('dag_id', 'run_id'),
-    )
+                    sa.Column('id', sa.Integer(), nullable=False),
+                    sa.Column('dag_id', sa.String(length=250), nullable=True),
+                    sa.Column('execution_date', sa.DateTime(), nullable=True),
+                    sa.Column('state', sa.String(length=50), nullable=True),
+                    sa.Column('run_id', sa.String(length=250), nullable=True),
+                    sa.Column('external_trigger', sa.Boolean(), nullable=True),
+                    sa.PrimaryKeyConstraint('id'),
+                    sa.UniqueConstraint('dag_id', 'execution_date'),
+                    sa.UniqueConstraint('dag_id', 'run_id'))
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/211e584da130_add_ti_state_index.py b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
index afc600d58a..b17f390e0b 100644
--- a/airflow/migrations/versions/211e584da130_add_ti_state_index.py
+++ b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
@@ -24,6 +24,7 @@
 Create Date: 2016-06-30 10:54:24.323588
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '211e584da130'
@@ -31,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_index('ti_state', 'task_instance', ['state'], unique=False)
diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
index 27a9f593b5..a757d27709 100644
--- a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
+++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
@@ -26,18 +26,16 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+import dill
+
 # revision identifiers, used by Alembic.
 revision = '27c6a30d7c24'
 down_revision = '33ae817a1ff4'
 branch_labels = None
 depends_on = None
 
-
-from alembic import op
-import sqlalchemy as sa
-import dill
-
-
 TASK_INSTANCE_TABLE = "task_instance"
 NEW_COLUMN = "executor_config"
 
@@ -48,4 +46,3 @@ def upgrade():
 
 def downgrade():
     op.drop_column(TASK_INSTANCE_TABLE, NEW_COLUMN)
-
diff --git a/airflow/migrations/versions/2e541a1dcfed_task_duration.py b/airflow/migrations/versions/2e541a1dcfed_task_duration.py
index 6b24ef66e4..595a5774a6 100644
--- a/airflow/migrations/versions/2e541a1dcfed_task_duration.py
+++ b/airflow/migrations/versions/2e541a1dcfed_task_duration.py
@@ -25,16 +25,16 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import mysql
+
 # revision identifiers, used by Alembic.
 revision = '2e541a1dcfed'
 down_revision = '1b38cef5b76e'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-
 
 def upgrade():
     # use batch_alter_table to support SQLite workaround
diff --git a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
index 75db27cdb3..fc8a1aab20 100644
--- a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
+++ b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
@@ -24,6 +24,7 @@
 Create Date: 2016-04-02 19:28:15.211915
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '2e82aab8ef20'
@@ -31,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.rename_table('user', 'users')
diff --git a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
index 4f1364b971..473f76778b 100644
--- a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
+++ b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py
@@ -17,13 +17,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""More logging into task_isntance
+"""More logging into task_instance
 
 Revision ID: 338e90f54d61
 Revises: 13eb55f81627
 Create Date: 2015-08-25 06:09:20.460147
 
 """
+# flake8: noqa: E266
+
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '338e90f54d61'
@@ -31,9 +35,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     ### commands auto generated by Alembic - please adjust! ###
diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index c489c05f7e..925bf26df0 100644
--- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -25,6 +25,8 @@
 Create Date: 2017-09-11 15:26:47.598494
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '33ae817a1ff4'
@@ -32,10 +34,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
-
 RESOURCE_TABLE = "kube_resource_version"
 
 
@@ -53,4 +51,3 @@ def upgrade():
 
 def downgrade():
     op.drop_table(RESOURCE_TABLE)
-
diff --git a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
index 02ea51501c..3da4d5f543 100644
--- a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
+++ b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py
@@ -24,6 +24,8 @@
 Create Date: 2015-10-29 08:36:31.726728
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '40e67319e3a9'
@@ -31,9 +33,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True))
diff --git a/airflow/migrations/versions/4446e08588_dagrun_start_end.py b/airflow/migrations/versions/4446e08588_dagrun_start_end.py
index 101f2ad905..29932c9206 100644
--- a/airflow/migrations/versions/4446e08588_dagrun_start_end.py
+++ b/airflow/migrations/versions/4446e08588_dagrun_start_end.py
@@ -25,15 +25,15 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = '4446e08588'
 down_revision = '561833c1c74b'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.add_column('dag_run', sa.Column('end_date', sa.DateTime(), nullable=True))
diff --git a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
index c3898e9596..655ff61042 100644
--- a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
+++ b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py
@@ -24,93 +24,147 @@
 
 """
 
+from alembic import op
+from sqlalchemy.dialects import mysql
+from alembic import context
+
 # revision identifiers, used by Alembic.
 revision = '4addfa1236f1'
 down_revision = 'f2ca10b85618'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-from alembic import context
-
 
 def upgrade():
     if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
-        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
-        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False)
-        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6))
-
-        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME(fsp=6))
-        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_scheduler_run',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_pickled',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag', column_name='last_expired',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='dag_run', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='start_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='dag_run', column_name='end_date',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='import_error', column_name='timestamp',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='job', column_name='start_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='end_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='job', column_name='latest_heartbeat',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='known_event', column_name='start_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='known_event', column_name='end_date',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='log', column_name='dttm',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='log', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6),
+                        nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_fail', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='start_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_fail', column_name='end_date',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='task_instance', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6),
+                        nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='end_date',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='task_instance', column_name='queued_dttm',
+                        type_=mysql.DATETIME(fsp=6))
+
+        op.alter_column(table_name='xcom', column_name='timestamp',
+                        type_=mysql.DATETIME(fsp=6))
+        op.alter_column(table_name='xcom', column_name='execution_date',
+                        type_=mysql.DATETIME(fsp=6))
 
 
 def downgrade():
     if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
-        op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME())
-        op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME())
-        op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME())
-        op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(), nullable=False)
-        op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(), nullable=False)
-        op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME())
-        op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME())
-
-        op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME())
-        op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME())
+        op.alter_column(table_name='dag', column_name='last_scheduler_run',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='dag', column_name='last_pickled',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='dag', column_name='last_expired',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='dag_pickle', column_name='created_dttm',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='dag_run', column_name='execution_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='dag_run', column_name='start_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='dag_run', column_name='end_date',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='import_error', column_name='timestamp',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='job', column_name='start_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='job', column_name='end_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='job', column_name='latest_heartbeat',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='known_event', column_name='start_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='known_event', column_name='end_date',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='log', column_name='dttm',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='log', column_name='execution_date',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='sla_miss', column_name='execution_date',
+                        type_=mysql.DATETIME(), nullable=False)
+        op.alter_column(table_name='sla_miss', column_name='timestamp',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='task_fail', column_name='execution_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='task_fail', column_name='start_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='task_fail', column_name='end_date',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='task_instance', column_name='execution_date',
+                        type_=mysql.DATETIME(),
+                        nullable=False)
+        op.alter_column(table_name='task_instance', column_name='start_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='task_instance', column_name='end_date',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='task_instance', column_name='queued_dttm',
+                        type_=mysql.DATETIME())
+
+        op.alter_column(table_name='xcom', column_name='timestamp',
+                        type_=mysql.DATETIME())
+        op.alter_column(table_name='xcom', column_name='execution_date',
+                        type_=mysql.DATETIME())
diff --git a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
index 333b18e8d2..632720a4e2 100644
--- a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
+++ b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py
@@ -24,6 +24,8 @@
 Create Date: 2015-11-03 22:50:49.794097
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '502898887f84'
@@ -31,9 +33,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.add_column('log', sa.Column('extra', sa.Text(), nullable=True))
diff --git a/airflow/migrations/versions/52d714495f0_job_id_indices.py b/airflow/migrations/versions/52d714495f0_job_id_indices.py
index 13f561cffb..94374cb68a 100644
--- a/airflow/migrations/versions/52d714495f0_job_id_indices.py
+++ b/airflow/migrations/versions/52d714495f0_job_id_indices.py
@@ -24,6 +24,7 @@
 Create Date: 2015-10-20 03:17:01.962542
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '52d714495f0'
@@ -31,12 +32,10 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
-    op.create_index('idx_job_state_heartbeat', 'job', ['state', 'latest_heartbeat'], unique=False)
+    op.create_index('idx_job_state_heartbeat', 'job',
+                    ['state', 'latest_heartbeat'], unique=False)
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
index d92db9f891..a26a105ac8 100644
--- a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
+++ b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py
@@ -24,6 +24,8 @@
 Create Date: 2015-11-30 06:51:25.872557
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '561833c1c74b'
@@ -31,9 +33,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.add_column('user', sa.Column('password', sa.String(255)))
diff --git a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
index e23a0a8b2f..77a35db48f 100644
--- a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
+++ b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
@@ -24,15 +24,15 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = '5e7d17757c7a'
 down_revision = '8504051e801b'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.add_column('task_instance', sa.Column('pid', sa.Integer))
diff --git a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
index 6dda5df43b..2def57e904 100644
--- a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
+++ b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py
@@ -23,6 +23,8 @@
 Create Date: 2016-08-03 14:02:59.203021
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '64de9cddf6c9'
@@ -30,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_table(
@@ -47,5 +46,6 @@ def upgrade():
         sa.PrimaryKeyConstraint('id'),
     )
 
+
 def downgrade():
     op.drop_table('task_fail')
diff --git a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
index d6a4514ae2..fdcbc59df8 100644
--- a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
+++ b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py
@@ -25,18 +25,18 @@
 
 """
 
+from alembic import op
+
 # revision identifiers, used by Alembic.
 revision = '8504051e801b'
 down_revision = '4addfa1236f1'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
-    op.create_index('idx_xcom_dag_task_date', 'xcom', ['dag_id', 'task_id', 'execution_date'], unique=False)
+    op.create_index('idx_xcom_dag_task_date', 'xcom',
+                    ['dag_id', 'task_id', 'execution_date'], unique=False)
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
index 5b11dc7860..52a817081b 100644
--- a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
+++ b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py
@@ -25,15 +25,15 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = '856955da8476'
 down_revision = 'f23433877c24'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     conn = op.get_bind()
@@ -45,41 +45,39 @@ def upgrade():
         #
         # Use batch_alter_table to support SQLite workaround.
         chart_table = sa.Table('chart',
-            sa.MetaData(),
-            sa.Column('id', sa.Integer(), nullable=False),
-            sa.Column('label', sa.String(length=200), nullable=True),
-            sa.Column('conn_id', sa.String(length=250), nullable=False),
-            sa.Column('user_id', sa.Integer(), nullable=True),
-            sa.Column('chart_type', sa.String(length=100), nullable=True),
-            sa.Column('sql_layout', sa.String(length=50), nullable=True),
-            sa.Column('sql', sa.Text(), nullable=True),
-            sa.Column('y_log_scale', sa.Boolean(), nullable=True),
-            sa.Column('show_datatable', sa.Boolean(), nullable=True),
-            sa.Column('show_sql', sa.Boolean(), nullable=True),
-            sa.Column('height', sa.Integer(), nullable=True),
-            sa.Column('default_params', sa.String(length=5000), nullable=True),
-            sa.Column('x_is_date', sa.Boolean(), nullable=True),
-            sa.Column('iteration_no', sa.Integer(), nullable=True),
-            sa.Column('last_modified', sa.DateTime(), nullable=True),
-            sa.PrimaryKeyConstraint('id')
-        )
+                               sa.MetaData(),
+                               sa.Column('id', sa.Integer(), nullable=False),
+                               sa.Column('label', sa.String(length=200), nullable=True),
+                               sa.Column('conn_id', sa.String(length=250), nullable=False),
+                               sa.Column('user_id', sa.Integer(), nullable=True),
+                               sa.Column('chart_type', sa.String(length=100), nullable=True),
+                               sa.Column('sql_layout', sa.String(length=50), nullable=True),
+                               sa.Column('sql', sa.Text(), nullable=True),
+                               sa.Column('y_log_scale', sa.Boolean(), nullable=True),
+                               sa.Column('show_datatable', sa.Boolean(), nullable=True),
+                               sa.Column('show_sql', sa.Boolean(), nullable=True),
+                               sa.Column('height', sa.Integer(), nullable=True),
+                               sa.Column('default_params', sa.String(length=5000), nullable=True),
+                               sa.Column('x_is_date', sa.Boolean(), nullable=True),
+                               sa.Column('iteration_no', sa.Integer(), nullable=True),
+                               sa.Column('last_modified', sa.DateTime(), nullable=True),
+                               sa.PrimaryKeyConstraint('id'))
         with op.batch_alter_table('chart', copy_from=chart_table) as batch_op:
             batch_op.create_foreign_key('chart_user_id_fkey', 'users',
                                         ['user_id'], ['id'])
 
         known_event_table = sa.Table('known_event',
-            sa.MetaData(),
-            sa.Column('id', sa.Integer(), nullable=False),
-            sa.Column('label', sa.String(length=200), nullable=True),
-            sa.Column('start_date', sa.DateTime(), nullable=True),
-            sa.Column('end_date', sa.DateTime(), nullable=True),
-            sa.Column('user_id', sa.Integer(), nullable=True),
-            sa.Column('known_event_type_id', sa.Integer(), nullable=True),
-            sa.Column('description', sa.Text(), nullable=True),
-            sa.ForeignKeyConstraint(['known_event_type_id'],
-                                    ['known_event_type.id'], ),
-            sa.PrimaryKeyConstraint('id')
-        )
+                                     sa.MetaData(),
+                                     sa.Column('id', sa.Integer(), nullable=False),
+                                     sa.Column('label', sa.String(length=200), nullable=True),
+                                     sa.Column('start_date', sa.DateTime(), nullable=True),
+                                     sa.Column('end_date', sa.DateTime(), nullable=True),
+                                     sa.Column('user_id', sa.Integer(), nullable=True),
+                                     sa.Column('known_event_type_id', sa.Integer(), nullable=True),
+                                     sa.Column('description', sa.Text(), nullable=True),
+                                     sa.ForeignKeyConstraint(['known_event_type_id'],
+                                                             ['known_event_type.id'], ),
+                                     sa.PrimaryKeyConstraint('id'))
         with op.batch_alter_table('chart', copy_from=known_event_table) as batch_op:
             batch_op.create_foreign_key('known_event_user_id_fkey', 'users',
                                         ['user_id'], ['id'])
diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
index 5c921c6a98..ace7845965 100644
--- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
+++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -25,6 +25,8 @@
 Create Date: 2018-04-03 15:31:20.814328
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = '86770d1215c0'
@@ -32,10 +34,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
-
 RESOURCE_TABLE = "kube_worker_uuid"
 
 
diff --git a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
index b821cacedc..6ff41baa28 100644
--- a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
+++ b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py
@@ -24,6 +24,7 @@
 Create Date: 2017-08-15 15:12:13.845074
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '947454bf1dff'
@@ -31,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False)
diff --git a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
index da69846233..6b21c3474a 100644
--- a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
+++ b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py
@@ -24,6 +24,7 @@
 Create Date: 2018-06-17 21:40:01.963540
 
 """
+from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = '9635ae0956e7'
@@ -31,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_index('idx_task_fail_dag_task_date', 'task_fail', ['dag_id', 'task_id', 'execution_date'], unique=False)
@@ -41,4 +39,3 @@ def upgrade():
 
 def downgrade():
     op.drop_index('idx_task_fail_dag_task_date', table_name='task_fail')
-
diff --git a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
index a19eb58eb5..503cd0b6f0 100644
--- a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
+++ b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py
@@ -25,18 +25,19 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+
 # revision identifiers, used by Alembic.
 revision = 'bba5a7cfc896'
 down_revision = 'bbc73705a13e'
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
-    op.add_column('connection', sa.Column('is_extra_encrypted', sa.Boolean,default=False))
+    op.add_column('connection',
+                  sa.Column('is_extra_encrypted', sa.Boolean, default=False))
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
index 04cdc22d6c..9855a6d4da 100644
--- a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
+++ b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py
@@ -24,6 +24,8 @@
 Create Date: 2016-01-14 18:05:54.871682
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = 'bbc73705a13e'
@@ -31,12 +33,9 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
-    op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean,default=False))
+    op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean, default=False))
 
 
 def downgrade():
diff --git a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
index a02eea5519..a1a5270c7b 100644
--- a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
+++ b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
@@ -23,6 +23,9 @@
 Create Date: 2017-08-14 16:06:31.568971
 
 """
+from alembic import op
+import dill
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = 'bdaa763e6c56'
@@ -30,15 +33,10 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import dill
-import sqlalchemy as sa
-
 
 def upgrade():
     # There can be data truncation here as LargeBinary can be smaller than the pickle
     # type.
-
     # use batch_alter_table to support SQLite workaround
     with op.batch_alter_table("xcom") as batch_op:
         batch_op.alter_column('value', type_=sa.LargeBinary())
diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
index 68228f7219..6155a40c81 100644
--- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
+++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -25,23 +25,22 @@
 
 """
 
-# revision identifiers, used by Alembic.
-revision = 'cc1e65623dc7'
-down_revision = '127d2bf2dfa7'
-branch_labels = None
-depends_on = None
-
 from alembic import op
 import sqlalchemy as sa
 from airflow import settings
 from airflow.models import DagBag
 from airflow.utils.sqlalchemy import UtcDateTime
 
-from sqlalchemy import (
-    Column, Integer, String)
+from sqlalchemy import Column, Integer, String
 from sqlalchemy.engine.reflection import Inspector
 from sqlalchemy.ext.declarative import declarative_base
 
+# revision identifiers, used by Alembic.
+revision = 'cc1e65623dc7'
+down_revision = '127d2bf2dfa7'
+branch_labels = None
+depends_on = None
+
 Base = declarative_base()
 BATCH_SIZE = 5000
 ID_LEN = 250
@@ -58,8 +57,7 @@ class TaskInstance(Base):
 
 
 def upgrade():
-    op.add_column('task_instance', sa.Column('max_tries', sa.Integer,
-        server_default="-1"))
+    op.add_column('task_instance', sa.Column('max_tries', sa.Integer, server_default="-1"))
     # Check if table task_instance exist before data migration. This check is
     # needed for database that does not create table until migration finishes.
     # Checking task_instance table exists prevent the error of querying
@@ -129,7 +127,7 @@ def downgrade():
                     # max number of self retry (task.retries) minus number of
                     # times left for task instance to try the task.
                     ti.try_number = max(0, task.retries - (ti.max_tries -
-                        ti.try_number))
+                                                           ti.try_number))
                 ti.max_tries = -1
                 session.merge(ti)
             session.commit()
diff --git a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
index 5ecb0d5c72..db5afaf023 100644
--- a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
+++ b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py
@@ -23,6 +23,9 @@
 Create Date: 2017-08-18 17:07:16.686130
 
 """
+from alembic import op
+from sqlalchemy.dialects import mysql
+from alembic import context
 
 # revision identifiers, used by Alembic.
 revision = 'd2ae31099d61'
@@ -30,11 +33,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
-from alembic import context
-
 
 def upgrade():
     if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
diff --git a/airflow/migrations/versions/e3a246e0dc1_current_schema.py b/airflow/migrations/versions/e3a246e0dc1_current_schema.py
index cfa4147dd9..cbf9897645 100644
--- a/airflow/migrations/versions/e3a246e0dc1_current_schema.py
+++ b/airflow/migrations/versions/e3a246e0dc1_current_schema.py
@@ -25,17 +25,17 @@
 
 """
 
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy import func
+from sqlalchemy.engine.reflection import Inspector
+
 # revision identifiers, used by Alembic.
 revision = 'e3a246e0dc1'
 down_revision = None
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy import func
-from sqlalchemy.engine.reflection import Inspector
-
 
 def upgrade():
     conn = op.get_bind()
diff --git a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
index 44edeef069..3e643f629d 100644
--- a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
+++ b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py
@@ -24,6 +24,8 @@
 Create Date: 2018-06-17 10:16:31.412131
 
 """
+from alembic import op
+from sqlalchemy.dialects import mysql
 
 # revision identifiers, used by Alembic.
 revision = 'f23433877c24'
@@ -31,9 +33,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
 
 def upgrade():
     conn = op.get_bind()
@@ -51,4 +50,3 @@ def downgrade():
         op.alter_column('xcom', 'timestamp', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
         op.alter_column('xcom', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
         op.alter_column('task_fail', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True)
-
diff --git a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
index 3e3d40abc4..e14b4b8025 100644
--- a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
+++ b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
@@ -23,6 +23,8 @@
 Create Date: 2016-07-20 15:08:28.247537
 
 """
+from alembic import op
+import sqlalchemy as sa
 
 # revision identifiers, used by Alembic.
 revision = 'f2ca10b85618'
@@ -30,9 +32,6 @@
 branch_labels = None
 depends_on = None
 
-from alembic import op
-import sqlalchemy as sa
-
 
 def upgrade():
     op.create_table('dag_stats',
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index c8cc981e93..4e2060baae 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -19,7 +19,7 @@
 
 import sys
 import os
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator  # noqa: F401
 
 # ------------------------------------------------------------------------
 #
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 216b2ed626..3393de0311 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -24,12 +24,11 @@
 
 from airflow.utils import timezone
 from datetime import datetime, timedelta
-from dateutil.relativedelta import relativedelta  # for doctest
+from dateutil.relativedelta import relativedelta  # flake8: noqa: F401 for doctest
 import six
 
 from croniter import croniter
 
-
 cron_presets = {
     '@hourly': '0 * * * *',
     '@daily': '0 0 * * *',
@@ -39,11 +38,7 @@
 }
 
 
-def date_range(
-        start_date,
-        end_date=None,
-        num=None,
-        delta=None):
+def date_range(start_date, end_date=None, num=None, delta=None):
     """
     Get a set of dates as a list based on a start, end and delta, delta
     can be something that can be added to ``datetime.datetime``
@@ -181,8 +176,8 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
             # Check if start_date + (lower + 1)*delta or
             # start_date + lower*delta is closer to dt and return the solution
             if (
-                    (start_date + (lower + 1) * delta) - dt <=
-                    dt - (start_date + lower * delta)):
+                (start_date + (lower + 1) * delta) - dt <=
+                dt - (start_date + lower * delta)):
                 return start_date + (lower + 1) * delta
             else:
                 return start_date + lower * delta
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index f1f0ea9a42..4bc8e0debd 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -100,5 +100,6 @@ def wrapper(*args, **kwargs):
     return wrapper
 
 if 'BUILDING_AIRFLOW_DOCS' in os.environ:
+    # flake8: noqa: F811
     # Monkey patch hook to get good function headers while building docs
     apply_defaults = lambda x: x
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 08347a515c..f4038715ac 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -22,7 +22,6 @@
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from builtins import str
 from past.builtins import basestring
 
 import importlib
@@ -62,13 +61,13 @@ def send_email_smtp(to, subject, html_content, files=None,
 
     >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
     """
-    SMTP_MAIL_FROM = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
+    smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
 
     to = get_email_address_list(to)
 
     msg = MIMEMultipart(mime_subtype)
     msg['Subject'] = subject
-    msg['From'] = SMTP_MAIL_FROM
+    msg['From'] = smtp_mail_from
     msg['To'] = ", ".join(to)
     recipients = to
     if cc:
@@ -96,7 +95,7 @@ def send_email_smtp(to, subject, html_content, files=None,
             part['Content-ID'] = '<%s>' % basename
             msg.attach(part)
 
-    send_MIME_email(SMTP_MAIL_FROM, recipients, msg, dryrun)
+    send_MIME_email(smtp_mail_from, recipients, msg, dryrun)
 
 
 def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 45d0217e23..d6b1d93b38 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -32,7 +32,6 @@
 import os
 import re
 import signal
-import subprocess
 import sys
 import warnings
 
@@ -226,6 +225,7 @@ def reap_process_group(pid, log, sig=signal.SIGTERM,
     :param sig: signal type
     :param timeout: how much time a process has to terminate
     """
+
     def on_terminate(p):
         log.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
 
diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py
index d74aabfbae..16372c0600 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Using `from elasticsearch import *` would break elasticseach mocking used in unit test.
+# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
 import elasticsearch
 import pendulum
 from elasticsearch_dsl import Search
diff --git a/airflow/www/app.py b/airflow/www/app.py
index f7976b0dd5..0e6a2a0c23 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -18,7 +18,6 @@
 # under the License.
 #
 import six
-import os
 
 from flask import Flask
 from flask_admin import Admin, base
@@ -64,8 +63,8 @@ def create_app(config=None, testing=False):
     api.load_auth()
     api.api_auth.init_app(app)
 
-    cache = Cache(
-        app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
+    # flake8: noqa: F841
+    cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
 
     app.register_blueprint(routes)
 
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 0c4f4b05d6..9ce114d5ed 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -17,11 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+# flake8: noqa: E402
 import inspect
 from future import standard_library
 standard_library.install_aliases()
-from builtins import str
-from builtins import object
+from builtins import str, object
 
 from cgi import escape
 from io import BytesIO as IO
@@ -29,13 +29,13 @@
 import gzip
 import json
 import time
+import wtforms
+from wtforms.compat import text_type
 
 from flask import after_this_request, request, Response
-from flask_admin.contrib.sqla.filters import FilterConverter
 from flask_admin.model import filters
+from flask_admin.contrib.sqla.filters import FilterConverter
 from flask_login import current_user
-import wtforms
-from wtforms.compat import text_type
 
 from airflow import configuration, models, settings
 from airflow.utils.db import create_session
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0c0dcff801..4401e15702 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -22,7 +22,6 @@
 import codecs
 import copy
 import datetime as dt
-import inspect
 import itertools
 import json
 import logging
@@ -365,6 +364,7 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
         'dr_state': dr_state,
     }
 
+
 class Airflow(BaseView):
     def is_visible(self):
         return False
@@ -481,8 +481,6 @@ def chart_data(self):
             else:
                 if chart.sql_layout == 'series':
                     # User provides columns (series, x, y)
-                    xaxis_label = df.columns[1]
-                    yaxis_label = df.columns[2]
                     df[df.columns[2]] = df[df.columns[2]].astype(np.float)
                     df = df.pivot_table(
                         index=df.columns[1],
@@ -490,8 +488,6 @@ def chart_data(self):
                         values=df.columns[2], aggfunc=np.sum)
                 else:
                     # User provides columns (x, y, metric1, metric2, ...)
-                    xaxis_label = df.columns[0]
-                    yaxis_label = 'y'
                     df.index = df[df.columns[0]]
                     df = df.sort(df.columns[0])
                     del df[df.columns[0]]
@@ -599,8 +595,8 @@ def task_stats(self, session=None):
             session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
                 .join(Dag, Dag.dag_id == DagRun.dag_id)
                 .filter(DagRun.state != State.RUNNING)
-                .filter(Dag.is_active == True)
-                .filter(Dag.is_subdag == False)
+                .filter(Dag.is_active == True)  # noqa: E712
+                .filter(Dag.is_subdag == False)  # noqa: E712
                 .group_by(DagRun.dag_id)
                 .subquery('last_dag_run')
         )
@@ -608,8 +604,8 @@ def task_stats(self, session=None):
             session.query(DagRun.dag_id, DagRun.execution_date)
                 .join(Dag, Dag.dag_id == DagRun.dag_id)
                 .filter(DagRun.state == State.RUNNING)
-                .filter(Dag.is_active == True)
-                .filter(Dag.is_subdag == False)
+                .filter(Dag.is_active == True)  # noqa: E712
+                .filter(Dag.is_subdag == False)  # noqa: E712
                 .subquery('running_dag_run')
         )
 
@@ -883,7 +879,7 @@ def task(self):
         for attr_name in dir(ti):
             if not attr_name.startswith('_'):
                 attr = getattr(ti, attr_name)
-                if type(attr) != type(self.task):
+                if type(attr) != type(self.task):  # noqa: E721
                     ti_attrs.append((attr_name, str(attr)))
 
         task_attrs = []
@@ -891,7 +887,7 @@ def task(self):
             if not attr_name.startswith('_'):
                 attr = getattr(task, attr_name)
                 if type(attr) != type(self.task) and \
-                        attr_name not in attr_renderer:
+                        attr_name not in attr_renderer:  # noqa: E721
                     task_attrs.append((attr_name, str(attr)))
 
         # Color coding the special attributes that are code
@@ -1166,7 +1162,6 @@ def clear(self):
     @wwwutils.notify_owner
     def dagrun_clear(self):
         dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
         origin = request.args.get('origin')
         execution_date = request.args.get('execution_date')
         confirmed = request.args.get('confirmed') == "true"
diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py
index b319426aa9..321185ee9b 100644
--- a/airflow/www_rbac/app.py
+++ b/airflow/www_rbac/app.py
@@ -19,7 +19,6 @@
 #
 import socket
 import six
-import os
 
 from flask import Flask
 from flask_appbuilder import AppBuilder, SQLA
@@ -59,7 +58,8 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
     api.load_auth()
     api.api_auth.init_app(app)
 
-    cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})  # noqa
+    # flake8: noqa: F841
+    cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'})
 
     from airflow.www_rbac.blueprints import routes
     app.register_blueprint(routes)
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 629f488fc7..7a2a2d3124 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -35,7 +35,7 @@
 import pendulum
 import sqlalchemy as sqla
 from flask import (
-    g, redirect, request, Markup, Response, render_template,
+    redirect, request, Markup, Response, render_template,
     make_response, flash, jsonify)
 from flask._compat import PY2
 from flask_appbuilder import BaseView, ModelView, expose, has_access
diff --git a/dags/test_dag.py b/dags/test_dag.py
index 3c385bbbef..a133dd5a12 100644
--- a/dags/test_dag.py
+++ b/dags/test_dag.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,9 @@
 from datetime import datetime, timedelta
 
 now = datetime.now()
-now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
+now_to_the_hour = (
+    now - timedelta(0, 0, 0, 0, 0, 3)
+).replace(minute=0, second=0, microsecond=0)
 START_DATE = now_to_the_hour
 DAG_NAME = 'test_dag_v1'
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services