You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/21 18:40:18 UTC

[2/3] incubator-airflow git commit: [AIRFLOW-2429] Fix contrib folder's flake8 errors

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/awsbatch_operator.py b/airflow/contrib/operators/awsbatch_operator.py
index 920fdb7..6e8e2a4 100644
--- a/airflow/contrib/operators/awsbatch_operator.py
+++ b/airflow/contrib/operators/awsbatch_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,9 +46,11 @@ class AWSBatchOperator(BaseOperator):
     :param max_retries: exponential backoff retries while waiter is not merged
     :type max_retries: int
     :param aws_conn_id: connection id of AWS credentials / region name. If None,
-            credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
+            credential boto3 strategy will be used
+            (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
     :type aws_conn_id: str
-    :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided)
+    :param region_name: region name to use in AWS Hook.
+        Override the region_name in connection (if provided)
     """
 
     ui_color = '#c3dae0'
@@ -152,9 +154,12 @@ class AWSBatchOperator(BaseOperator):
                     if (job['status'] == 'FAILED' or
                             container['container']['exitCode'] != 0):
                         print("@@@@")
-                        raise AirflowException('This containers encounter an error during execution {}'.format(job))
+                        raise AirflowException(
+                            'This containers encounter an error during '
+                            'execution {}'.format(job))
             elif job['status'] is not 'SUCCEEDED':
-                raise AirflowException('This task is still pending {}'.format(job['status']))
+                raise AirflowException(
+                    'This task is still pending {}'.format(job['status']))
 
     def get_hook(self):
         return AwsHook(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 799d6d5..59ef5d3 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,8 @@
 # under the License.
 
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
-from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import \
+    CheckOperator, ValueCheckOperator, IntervalCheckOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_get_data.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py
index aabfddd..ab8f71b 100644
--- a/airflow/contrib/operators/bigquery_get_data.py
+++ b/airflow/contrib/operators/bigquery_get_data.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 04ddb6a..dc5edb3 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 8751dfe..a16107d 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index b03511a..93a52b3 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 166d05e..e2ce930 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8d22d4c..7b8d522 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -205,13 +205,14 @@ class DatabricksSubmitRunOperator(BaseOperator):
         Coerces content or all values of content if it is a dict to a string. The
         function will throw if content contains non-string or non-numeric types.
 
-        The reason why we have this function is because the ``self.json`` field must be a dict
-        with only string values. This is because ``render_template`` will fail for numerical values.
+        The reason why we have this function is because the ``self.json`` field must be a
+         dict with only string values. This is because ``render_template`` will fail
+        for numerical values.
         """
         c = self._deep_string_coerce
         if isinstance(content, six.string_types):
             return content
-        elif isinstance(content, six.integer_types+(float,)):
+        elif isinstance(content, six.integer_types + (float,)):
             # Databricks can tolerate either numeric or string types in the API backend.
             return str(content)
         elif isinstance(content, (list, tuple)):
@@ -221,8 +222,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
                     for k, v in list(content.items())}
         else:
             param_type = type(content)
-            msg = 'Type {0} used for parameter {1} is not a number or a string' \
-                    .format(param_type, json_path)
+            msg = 'Type {0} used for parameter {1} is not a number or a string'\
+                .format(param_type, json_path)
             raise AirflowException(msg)
 
     def _log_run_page_url(self, url):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 11adc60..e3c8c1f 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 22db3e6..4973eb1 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -249,13 +249,12 @@ class DataprocClusterCreateOperator(BaseOperator):
                 self.project_id, self.zone
             )
         master_type_uri = \
-            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
-                self.project_id, self.zone, self.master_machine_type
-            )
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\
+            .format(self.project_id, self.zone, self.master_machine_type)
         worker_type_uri = \
-            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
-                self.project_id, self.zone, self.worker_machine_type
-            )
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\
+            .format(self.project_id, self.zone, self.worker_machine_type)
+
         cluster_data = {
             'projectId': self.project_id,
             'clusterName': self.cluster_name,
@@ -297,7 +296,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows
         # semantic versioning spec: x.y.z).
         cluster_data['labels'].update({'airflow-version':
-                                       'v' + version.replace('.', '-').replace('+','-')})
+                                       'v' + version.replace('.', '-').replace('+', '-')})
         if self.storage_bucket:
             cluster_data['config']['configBucket'] = self.storage_bucket
         if self.metadata:
@@ -305,7 +304,8 @@ class DataprocClusterCreateOperator(BaseOperator):
         if self.network_uri:
             cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri
         if self.subnetwork_uri:
-            cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri
+            cluster_data['config']['gceClusterConfig']['subnetworkUri'] = \
+                self.subnetwork_uri
         if self.tags:
             cluster_data['config']['gceClusterConfig']['tags'] = self.tags
         if self.image_version:
@@ -332,10 +332,10 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['initializationActions'] = init_actions_dict
         if self.service_account:
             cluster_data['config']['gceClusterConfig']['serviceAccount'] =\
-                    self.service_account
+                self.service_account
         if self.service_account_scopes:
             cluster_data['config']['gceClusterConfig']['serviceAccountScopes'] =\
-                    self.service_account_scopes
+                self.service_account_scopes
         return cluster_data
 
     def execute(self, context):
@@ -368,7 +368,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                 self.log.info(
                     'Cluster {} already exists... Checking status...',
                     self.cluster_name
-                 )
+                )
                 self._wait_for_done(service)
                 return True
             else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 09b7965..f6dc7cc 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,15 +35,16 @@ class DatastoreExportOperator(BaseOperator):
     :type namespace: str
     :param datastore_conn_id: the name of the Datastore connection id to use
     :type datastore_conn_id: string
-    :param cloud_storage_conn_id: the name of the cloud storage connection id to force-write
-        backup
+    :param cloud_storage_conn_id: the name of the cloud storage connection id to
+        force-write backup
     :type cloud_storage_conn_id: string
     :param delegate_to: The account to impersonate, if any.
         For this to work, the service account making the request must have domain-wide
         delegation enabled.
     :type delegate_to: string
-    :param entity_filter: description of what data from the project is included in the export,
-        refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
+    :param entity_filter: description of what data from the project is included in the
+        export, refer to
+        https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
     :type entity_filter: dict
     :param labels: client-assigned labels for cloud storage
     :type labels: dict
@@ -92,7 +93,7 @@ class DatastoreExportOperator(BaseOperator):
             for o in objects:
                 gcs_hook.delete(self.bucket, o)
 
-        ds_hook = DatastoreHook(self.datastore_conn_id,self.delegate_to)
+        ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
         result = ds_hook.export_to_storage_bucket(bucket=self.bucket,
                                                   namespace=self.namespace,
                                                   entity_filter=self.entity_filter,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 279b1a5..401d36e 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,9 @@ class DatastoreImportOperator(BaseOperator):
     :param namespace: optional namespace of the backup metadata file in
         the specified Cloud Storage bucket.
     :type namespace: str
-    :param entity_filter: description of what data from the project is included in the export,
-        refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
+    :param entity_filter: description of what data from the project is included in
+        the export, refer to
+        https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
     :type entity_filter: dict
     :param labels: client-assigned labels for cloud storage
     :type labels: dict

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/discord_webhook_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/discord_webhook_operator.py b/airflow/contrib/operators/discord_webhook_operator.py
index acfba58..6b44eac 100644
--- a/airflow/contrib/operators/discord_webhook_operator.py
+++ b/airflow/contrib/operators/discord_webhook_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py
index c0cb09d..426393d 100644
--- a/airflow/contrib/operators/druid_operator.py
+++ b/airflow/contrib/operators/druid_operator.py
@@ -29,18 +29,17 @@ class DruidOperator(BaseOperator):
 
     :param json_index_file: The filepath to the druid index specification
     :type json_index_file: str
-    :param druid_ingest_conn_id: The connection id of the Druid overlord which accepts index jobs
+    :param druid_ingest_conn_id: The connection id of the Druid overlord which
+        accepts index jobs
     :type druid_ingest_conn_id: str
     """
     template_fields = ('index_spec_str',)
     template_ext = ('.json',)
 
-    def __init__(
-        self,
-        json_index_file,
-        druid_ingest_conn_id='druid_ingest_default',
-        max_ingestion_time=None,
-        *args, **kwargs):
+    def __init__(self, json_index_file,
+                 druid_ingest_conn_id='druid_ingest_default',
+                 max_ingestion_time=None,
+                 *args, **kwargs):
         super(DruidOperator, self).__init__(*args, **kwargs)
         self.conn_id = druid_ingest_conn_id
         self.max_ingestion_time = max_ingestion_time

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 1016fca..60540f5 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,9 +37,11 @@ class ECSOperator(BaseOperator):
             http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
     :type: overrides: dict
     :param aws_conn_id: connection id of AWS credentials / region name. If None,
-            credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
+            credential boto3 strategy will be used
+            (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
     :type aws_conn_id: str
-    :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided)
+    :param region_name: region name to use in AWS Hook.
+        Override the region_name in connection (if provided)
     :param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE')
     :type: launch_type: str
     """
@@ -66,7 +68,7 @@ class ECSOperator(BaseOperator):
     def execute(self, context):
         self.log.info(
             'Running ECS Task - Task definition: %s - on cluster %s',
-            self.task_definition,self.cluster
+            self.task_definition, self.cluster
         )
         self.log.info('ECSOperator overrides: %s', self.overrides)
 
@@ -115,13 +117,16 @@ class ECSOperator(BaseOperator):
         for task in response['tasks']:
             containers = task['containers']
             for container in containers:
-                if container.get('lastStatus') == 'STOPPED' and container['exitCode'] != 0:
-                    raise AirflowException('This task is not in success state {}'.format(task))
+                if container.get('lastStatus') == 'STOPPED' and \
+                        container['exitCode'] != 0:
+                    raise AirflowException(
+                        'This task is not in success state {}'.format(task))
                 elif container.get('lastStatus') == 'PENDING':
                     raise AirflowException('This task is still pending {}'.format(task))
                 elif 'error' in container.get('reason', '').lower():
-                    raise AirflowException('This containers encounter an error during launching : {}'.
-                                           format(container.get('reason', '').lower()))
+                    raise AirflowException(
+                        'This containers encounter an error during launching : {}'.
+                        format(container.get('reason', '').lower()))
 
     def get_hook(self):
         return AwsHook(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index 3d2d510..643ffe9 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index e90c77a..89be12f 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,8 @@ from airflow.exceptions import AirflowException
 class EmrCreateJobFlowOperator(BaseOperator):
     """
     Creates an EMR JobFlow, reading the config from the EMR connection.
-    A dictionary of JobFlow overrides can be passed that override the config from the connection.
+    A dictionary of JobFlow overrides can be passed that override
+    the config from the connection.
 
     :param aws_conn_id: aws connection to uses
     :type aws_conn_id: str

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index 95c3164..50407a1 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py
index b65e3b3..807385b 100644
--- a/airflow/contrib/operators/file_to_gcs.py
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,8 +65,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
         Uploads the file to Google cloud storage
         """
         hook = GoogleCloudStorageHook(
-                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
-                delegate_to=self.delegate_to)
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to)
 
         hook.upload(
             bucket=self.bucket,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index f55c0c1..797960c 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -57,6 +57,8 @@ class FileToWasbOperator(BaseOperator):
         """Upload a file to Azure Blob Storage."""
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
         self.log.info(
-            'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
+            'Uploading {self.file_path} to wasb://{self.container_name} '
+            'as {self.blob_name}'.format(**locals())
         )
-        hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
+        hook.load_file(self.file_path, self.container_name,
+                       self.blob_name, **self.load_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 7dfa96c..ce272ae 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,7 +46,8 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         connecting to Google cloud storage.
     :type google_cloud_storage_conn_id: string
     :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have domain-wide delegation enabled.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
     :type delegate_to: string
     """
     template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',)
@@ -84,5 +85,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
             if sys.getsizeof(file_bytes) < 48000:
                 context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
             else:
-                raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
+                raise RuntimeError(
+                    'The size of the downloaded file is too large to push to XCom!'
+                )
         self.log.debug(file_bytes)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py
index 393deb9..6474453 100644
--- a/airflow/contrib/operators/gcs_list_operator.py
+++ b/airflow/contrib/operators/gcs_list_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_operator.py b/airflow/contrib/operators/gcs_operator.py
index 3d339d8..ef5e8de 100644
--- a/airflow/contrib/operators/gcs_operator.py
+++ b/airflow/contrib/operators/gcs_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 938da4e..3a77980 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -184,8 +184,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                                delegate_to=self.delegate_to)
 
-        if not self.schema_fields and self.schema_object \
-                                  and self.source_format != 'DATASTORE_BACKUP':
+        if not self.schema_fields and \
+                self.schema_object and \
+                self.source_format != 'DATASTORE_BACKUP':
             gcs_hook = GoogleCloudStorageHook(
                 google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                 delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_s3.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py
index 00c5e14..a87aa3a 100644
--- a/airflow/contrib/operators/gcs_to_s3.py
+++ b/airflow/contrib/operators/gcs_to_s3.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 5dd06f6..381cd72 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hive_to_dynamodb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hive_to_dynamodb.py b/airflow/contrib/operators/hive_to_dynamodb.py
index 2b6f88f..4a39e40 100644
--- a/airflow/contrib/operators/hive_to_dynamodb.py
+++ b/airflow/contrib/operators/hive_to_dynamodb.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jenkins_job_trigger_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jenkins_job_trigger_operator.py b/airflow/contrib/operators/jenkins_job_trigger_operator.py
index ad59797..37185f3 100644
--- a/airflow/contrib/operators/jenkins_job_trigger_operator.py
+++ b/airflow/contrib/operators/jenkins_job_trigger_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jira_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jira_operator.py b/airflow/contrib/operators/jira_operator.py
index 64869e4..01d78b8 100644
--- a/airflow/contrib/operators/jira_operator.py
+++ b/airflow/contrib/operators/jira_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 391ec58..8e88b68 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -29,6 +29,7 @@ template_fields = ('templates_dict',)
 template_ext = tuple()
 ui_color = '#ffefeb'
 
+
 class KubernetesPodOperator(BaseOperator):
     """
     Execute a task in a Kubernetes Pod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/mlengine_operator_utils.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator_utils.py b/airflow/contrib/operators/mlengine_operator_utils.py
index 5fda6ae..7ce784e 100644
--- a/airflow/contrib/operators/mlengine_operator_utils.py
+++ b/airflow/contrib/operators/mlengine_operator_utils.py
@@ -28,6 +28,7 @@ from airflow.exceptions import AirflowException
 from airflow.operators.python_operator import PythonOperator
 from six.moves.urllib.parse import urlsplit
 
+
 def create_evaluate_ops(task_prefix,
                         data_format,
                         input_paths,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py
index 9eac70d..88b4d00 100644
--- a/airflow/contrib/operators/postgres_to_gcs_operator.py
+++ b/airflow/contrib/operators/postgres_to_gcs_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
index 42586aa..2d55b19 100644
--- a/airflow/contrib/operators/pubsub_operator.py
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 47ccf53..88e8bdf 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/s3_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
index df448ad..b85691b 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index 519dc1d..153f440 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,8 @@ class SFTPOperator(BaseOperator):
         self.local_filepath = local_filepath
         self.remote_filepath = remote_filepath
         self.operation = operation
-        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+        if not (self.operation.lower() == SFTPOperation.GET or
+                self.operation.lower() == SFTPOperation.PUT):
             raise TypeError("unsupported operation value {0}, expected {1} or {2}"
                             .format(self.operation, SFTPOperation.GET, SFTPOperation.PUT))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/slack_webhook_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/slack_webhook_operator.py b/airflow/contrib/operators/slack_webhook_operator.py
index 124b249..bab3e90 100644
--- a/airflow/contrib/operators/slack_webhook_operator.py
+++ b/airflow/contrib/operators/slack_webhook_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/snowflake_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/snowflake_operator.py b/airflow/contrib/operators/snowflake_operator.py
index 25e9bf4..39d7d49 100644
--- a/airflow/contrib/operators/snowflake_operator.py
+++ b/airflow/contrib/operators/snowflake_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_jdbc_operator.py b/airflow/contrib/operators/spark_jdbc_operator.py
index c915aaa..42f9dd5 100644
--- a/airflow/contrib/operators/spark_jdbc_operator.py
+++ b/airflow/contrib/operators/spark_jdbc_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 864c137..825a367 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,11 @@ class SparkSqlOperator(BaseOperator):
     :type conf: str (format: PROP=VALUE)
     :param conn_id: connection_id string
     :type conn_id: str
-    :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all
+        executors (Default: all the available cores on the worker)
     :type total_executor_cores: int
-    :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
+    :param executor_cores: (Standalone & YARN only) Number of cores per
+        executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index 05996d8..fa61ca1 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,12 +33,16 @@ from airflow.utils.decorators import apply_defaults
 class SqoopOperator(BaseOperator):
     """
     Execute a Sqoop job.
-    Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
+    Documentation for Apache Sqoop can be found here:
+        https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
     """
-    template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type', 'columns', 'split_by',
-                       'where', 'export_dir', 'input_null_string', 'input_null_non_string', 'staging_table',
-                       'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by',
-                       'input_optionally_enclosed_by', 'properties', 'extra_import_options', 'driver',
+    template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir',
+                       'file_type', 'columns', 'split_by',
+                       'where', 'export_dir', 'input_null_string',
+                       'input_null_non_string', 'staging_table',
+                       'enclosed_by', 'escaped_by', 'input_fields_terminated_by',
+                       'input_lines_terminated_by', 'input_optionally_enclosed_by',
+                       'properties', 'extra_import_options', 'driver',
                        'extra_export_options', 'hcatalog_database', 'hcatalog_table',)
     ui_color = '#7D8CA4'
 
@@ -115,7 +119,8 @@ class SqoopOperator(BaseOperator):
         :param relaxed_isolation: use read uncommitted isolation level
         :param hcatalog_database: Specifies the database name for the HCatalog table
         :param hcatalog_table: The argument value for this option is the HCatalog table
-        :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not
+        :param create_hcatalog_table: Have sqoop create the hcatalog table passed
+            in or not
         :param properties: additional JVM properties passed to sqoop
         :param extra_import_options: Extra import options to pass as dict.
             If a key doesn't have a value, just pass an empty string to it.
@@ -189,7 +194,8 @@ class SqoopOperator(BaseOperator):
                 extra_export_options=self.extra_export_options)
         elif self.cmd_type == 'import':
             # add create hcatalog table to extra import options if option passed
-            # if new params are added to constructor can pass them in here so don't modify sqoop_hook for each param
+            # if new params are added to constructor can pass them in here
+            # so don't modify sqoop_hook for each param
             if self.create_hcatalog_table:
                 self.extra_import_options['create-hcatalog-table'] = ''
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index b5ab2dc..d246800 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -109,7 +109,9 @@ class SSHOperator(BaseOperator):
                 agg_stdout += stdout.channel.recv(stdout_buffer_length)
 
             # read from both stdout and stderr
-            while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
+            while not channel.closed or \
+                    channel.recv_ready() or \
+                    channel.recv_stderr_ready():
                 readq, _, _ = select([channel], [], [], self.timeout)
                 for c in readq:
                     if c.recv_ready():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index c2a6efb..41072ff 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 40166fa..00fdb64 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -94,7 +94,9 @@ class VerticaToHiveTransfer(BaseOperator):
     @classmethod
     def type_map(cls, vertica_type):
         # vertica-python datatype.py donot provied the full type mapping access.
-        # Manual hack. Reference: https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
+        # Manual hack.
+        # Reference:
+        # https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
         d = {
             5: 'BOOLEAN',
             6: 'INT',
@@ -120,7 +122,8 @@ class VerticaToHiveTransfer(BaseOperator):
             for field in cursor.description:
                 col_count += 1
                 col_position = "Column{position}".format(position=col_count)
-                field_dict[col_position if field[0] == '' else field[0]] = self.type_map(field[1])
+                field_dict[col_position if field[0] == '' else field[0]] = \
+                    self.type_map(field[1])
             csv_writer.writerows(cursor.iterate())
             f.flush()
             cursor.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
index 22af0ce..9d3fe72 100644
--- a/airflow/contrib/operators/vertica_to_mysql.py
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import logging
+import MySQLdb
 
 from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.hooks.mysql_hook import MySqlHook
@@ -60,7 +61,7 @@ class VerticaToMySqlTransfer(BaseOperator):
     """
 
     template_fields = ('sql', 'mysql_table', 'mysql_preoperator',
-        'mysql_postoperator')
+                       'mysql_postoperator')
     template_ext = ('.sql',)
     ui_color = '#a0e08c'
 
@@ -102,7 +103,9 @@ class VerticaToMySqlTransfer(BaseOperator):
                 if self.bulk_load:
                     tmpfile = NamedTemporaryFile("w")
 
-                    logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...")
+                    logging.info(
+                        "Selecting rows from Vertica to local file " + str(
+                            tmpfile.name) + "...")
                     logging.info(self.sql)
 
                     csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
@@ -129,14 +132,20 @@ class VerticaToMySqlTransfer(BaseOperator):
                 logging.info("Bulk inserting rows into MySQL...")
                 with closing(mysql.get_conn()) as conn:
                     with closing(conn.cursor()) as cursor:
-                        cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns)))
+                        cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO "
+                                       "TABLE %s LINES TERMINATED BY '\r\n' (%s)" %
+                                       (tmpfile.name,
+                                        self.mysql_table,
+                                        ", ".join(selected_columns)))
                         conn.commit()
                 tmpfile.close()
             else:
                 logging.info("Inserting rows into MySQL...")
-                mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+                mysql.insert_rows(table=self.mysql_table,
+                                  rows=result,
+                                  target_fields=selected_columns)
             logging.info("Inserted rows into MySQL " + str(count))
-        except:
+        except (MySQLdb.Error, MySQLdb.Warning):
             logging.error("Inserted rows into MySQL 0")
             raise
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/plugins/metastore_browser/main.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/plugins/metastore_browser/main.py b/airflow/contrib/plugins/metastore_browser/main.py
index 9a47ef5..836e531 100644
--- a/airflow/contrib/plugins/metastore_browser/main.py
+++ b/airflow/contrib/plugins/metastore_browser/main.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -138,7 +138,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin):
         """.format(where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT)
         h = MySqlHook(METASTORE_MYSQL_CONN_ID)
         d = [
-                {'id': row[0], 'text': row[0]}
+            {'id': row[0], 'text': row[0]}
             for row in h.get_records(sql)]
         return json.dumps(d)
 
@@ -161,6 +161,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin):
         h = HiveCliHook(HIVE_CLI_CONN_ID)
         return h.run_cli(sql)
 
+
 v = MetastoreBrowserView(category="Plugins", name="Hive Metadata Browser")
 
 # Creating a flask blueprint to intergrate the templates and static folder

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/__init__.py b/airflow/contrib/sensors/__init__.py
index 4067cc7..114d189 100644
--- a/airflow/contrib/sensors/__init__.py
+++ b/airflow/contrib/sensors/__init__.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 3b130af..6709524 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
index 965c13c..26fbb06 100644
--- a/airflow/contrib/sensors/bash_sensor.py
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -66,7 +66,6 @@ class BashSensor(BaseSensorOperator):
         self.log.info("Tmp dir root location: \n %s", gettempdir())
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
-
                 f.write(bytes(bash_command, 'utf_8'))
                 f.flush()
                 fname = f.name

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index fe75548..2e496f6 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,37 +24,36 @@ from airflow.utils.decorators import apply_defaults
 class BigQueryTableSensor(BaseSensorOperator):
     """
     Checks for the existence of a table in Google Bigquery.
-    """
-    template_fields = ('project_id', 'dataset_id', 'table_id',)
-    ui_color = '#f0eee4'
-
-    @apply_defaults
-    def __init__(
-            self,
-            project_id,
-            dataset_id,
-            table_id,
-            bigquery_conn_id='bigquery_default_conn',
-            delegate_to=None,
-            *args,
-            **kwargs):
-        """
-        Create a new BigQueryTableSensor.
 
-        :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
-        must provide access to the specified project.
+        :param project_id: The Google cloud project in which to look for the table.
+            The connection supplied to the hook must provide
+            access to the specified project.
         :type project_id: string
         :param dataset_id: The name of the dataset in which to look for the table.
             storage bucket.
         :type dataset_id: string
         :param table_id: The name of the table to check the existence of.
         :type table_id: string
-        :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery.
+        :param bigquery_conn_id: The connection ID to use when connecting to
+            Google BigQuery.
         :type bigquery_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have domain-wide delegation enabled.
+            For this to work, the service account making the request must
+            have domain-wide delegation enabled.
         :type delegate_to: string
-        """
+    """
+    template_fields = ('project_id', 'dataset_id', 'table_id',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 dataset_id,
+                 table_id,
+                 bigquery_conn_id='bigquery_default_conn',
+                 delegate_to=None,
+                 *args, **kwargs):
+
         super(BigQueryTableSensor, self).__init__(*args, **kwargs)
         self.project_id = project_id
         self.dataset_id = dataset_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index 9dad505..fb455d6 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index edb420e..abeb421 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index c33dbad..31d16a0 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,8 @@ class EmrJobFlowSensor(EmrBaseSensor):
     :type job_flow_id: string
     """
 
-    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING']
+    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING',
+                           'WAITING', 'TERMINATING']
     FAILED_STATE = ['TERMINATED_WITH_ERRORS']
     template_fields = ['job_flow_id']
     template_ext = ()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index bd76969..3dddf01 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/file_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py
index 2e3a3fd..3f7bb24 100644
--- a/airflow/contrib/sensors/file_sensor.py
+++ b/airflow/contrib/sensors/file_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index efdedd7..f0a4928 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 036d4f0..23cd760 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,21 +24,7 @@ from airflow.utils.decorators import apply_defaults
 class GoogleCloudStorageObjectSensor(BaseSensorOperator):
     """
     Checks for the existence of a file in Google Cloud Storage.
-    """
-    template_fields = ('bucket', 'object')
-    ui_color = '#f0eee4'
-
-    @apply_defaults
-    def __init__(
-            self,
-            bucket,
-            object,  # pylint:disable=redefined-builtin
-            google_cloud_conn_id='google_cloud_default',
-            delegate_to=None,
-            *args,
-            **kwargs):
-        """
-        Create a new GoogleCloudStorageObjectSensor.
+    Create a new GoogleCloudStorageObjectSensor.
 
         :param bucket: The Google cloud storage bucket where the object is.
         :type bucket: string
@@ -49,9 +35,21 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
             connecting to Google cloud storage.
         :type google_cloud_storage_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have domain-wide delegation enabled.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
         :type delegate_to: string
-        """
+    """
+    template_fields = ('bucket', 'object')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 object,  # pylint:disable=redefined-builtin
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args, **kwargs):
+
         super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs)
         self.bucket = bucket
         self.object = object
@@ -78,23 +76,7 @@ def ts_function(context):
 class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
     """
     Checks if an object is updated in Google Cloud Storage.
-    """
-    template_fields = ('bucket', 'object')
-    template_ext = ('.sql',)
-    ui_color = '#f0eee4'
-
-    @apply_defaults
-    def __init__(
-            self,
-            bucket,
-            object,  # pylint:disable=redefined-builtin
-            ts_func=ts_function,
-            google_cloud_conn_id='google_cloud_default',
-            delegate_to=None,
-            *args,
-            **kwargs):
-        """
-        Create a new GoogleCloudStorageObjectUpdatedSensor.
+    Create a new GoogleCloudStorageObjectUpdatedSensor.
 
         :param bucket: The Google cloud storage bucket where the object is.
         :type bucket: string
@@ -112,7 +94,20 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
-        """
+    """
+    template_fields = ('bucket', 'object')
+    template_ext = ('.sql',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 object,  # pylint:disable=redefined-builtin
+                 ts_func=ts_function,
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args, **kwargs):
+
         super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs)
         self.bucket = bucket
         self.object = object
@@ -131,21 +126,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
 class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
     """
     Checks for the existence of a files at prefix in Google Cloud Storage bucket.
-    """
-    template_fields = ('bucket', 'prefix')
-    ui_color = '#f0eee4'
-
-    @apply_defaults
-    def __init__(
-        self,
-        bucket,
-        prefix,
-        google_cloud_conn_id='google_cloud_default',
-        delegate_to=None,
-        *args,
-        **kwargs):
-        """
-        Create a new GoogleCloudStorageObjectSensor.
+    Create a new GoogleCloudStorageObjectSensor.
 
         :param bucket: The Google cloud storage bucket where the object is.
         :type bucket: string
@@ -156,9 +137,20 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
             connecting to Google cloud storage.
         :type google_cloud_storage_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have domain-wide delegation enabled.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
         :type delegate_to: string
-        """
+    """
+    template_fields = ('bucket', 'prefix')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix,
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args, **kwargs):
         super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs)
         self.bucket = bucket
         self.prefix = prefix
@@ -166,7 +158,8 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix)
+        self.log.info('Sensor checks existence of objects: %s, %s',
+                      self.bucket, self.prefix)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py
index 5193add..832b81b 100644
--- a/airflow/contrib/sensors/hdfs_sensor.py
+++ b/airflow/contrib/sensors/hdfs_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 6f73d19..0583c22 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -136,12 +136,16 @@ class JiraTicketSensor(JiraSensor):
                         )
 
         except JIRAError as jira_error:
-            self.log.error("Jira error while checking with expected value: %s", jira_error)
+            self.log.error("Jira error while checking with expected value: %s",
+                           jira_error)
         except Exception as e:
-            self.log.error("Error while checking with expected value %s:", self.expected_value)
+            self.log.error("Error while checking with expected value %s:",
+                           self.expected_value)
             self.log.exception(e)
         if result is True:
-            self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
+            self.log.info("Issue field %s has expected value %s, returning success",
+                          self.field, self.expected_value)
         else:
-            self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
+            self.log.info("Issue field %s don't have expected value %s yet.",
+                          self.field, self.expected_value)
         return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/pubsub_sensor.py b/airflow/contrib/sensors/pubsub_sensor.py
index c6b8467..7d17215 100644
--- a/airflow/contrib/sensors/pubsub_sensor.py
+++ b/airflow/contrib/sensors/pubsub_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 0ecd34a..baf3e16 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/sftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/sftp_sensor.py b/airflow/contrib/sensors/sftp_sensor.py
index 08559e9..801943a 100644
--- a/airflow/contrib/sensors/sftp_sensor.py
+++ b/airflow/contrib/sensors/sftp_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index c24f626..ec6a63b 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/__init__.py b/airflow/contrib/task_runner/__init__.py
index 4067cc7..114d189 100644
--- a/airflow/contrib/task_runner/__init__.py
+++ b/airflow/contrib/task_runner/__init__.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY