You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/21 18:40:18 UTC
[2/3] incubator-airflow git commit: [AIRFLOW-2429] Fix contrib
folder's flake8 errors
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/awsbatch_operator.py b/airflow/contrib/operators/awsbatch_operator.py
index 920fdb7..6e8e2a4 100644
--- a/airflow/contrib/operators/awsbatch_operator.py
+++ b/airflow/contrib/operators/awsbatch_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,9 +46,11 @@ class AWSBatchOperator(BaseOperator):
:param max_retries: exponential backoff retries while waiter is not merged
:type max_retries: int
:param aws_conn_id: connection id of AWS credentials / region name. If None,
- credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
+ credential boto3 strategy will be used
+ (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
:type aws_conn_id: str
- :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided)
+ :param region_name: region name to use in AWS Hook.
+ Override the region_name in connection (if provided)
"""
ui_color = '#c3dae0'
@@ -152,9 +154,12 @@ class AWSBatchOperator(BaseOperator):
if (job['status'] == 'FAILED' or
container['container']['exitCode'] != 0):
print("@@@@")
- raise AirflowException('This containers encounter an error during execution {}'.format(job))
+ raise AirflowException(
+ 'This containers encounter an error during '
+ 'execution {}'.format(job))
elif job['status'] is not 'SUCCEEDED':
- raise AirflowException('This task is still pending {}'.format(job['status']))
+ raise AirflowException(
+ 'This task is still pending {}'.format(job['status']))
def get_hook(self):
return AwsHook(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 799d6d5..59ef5d3 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,8 @@
# under the License.
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
-from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import \
+ CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_get_data.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py
index aabfddd..ab8f71b 100644
--- a/airflow/contrib/operators/bigquery_get_data.py
+++ b/airflow/contrib/operators/bigquery_get_data.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 04ddb6a..dc5edb3 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 8751dfe..a16107d 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index b03511a..93a52b3 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 166d05e..e2ce930 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8d22d4c..7b8d522 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -205,13 +205,14 @@ class DatabricksSubmitRunOperator(BaseOperator):
Coerces content or all values of content if it is a dict to a string. The
function will throw if content contains non-string or non-numeric types.
- The reason why we have this function is because the ``self.json`` field must be a dict
- with only string values. This is because ``render_template`` will fail for numerical values.
+ The reason why we have this function is because the ``self.json`` field must be a
+ dict with only string values. This is because ``render_template`` will fail
+ for numerical values.
"""
c = self._deep_string_coerce
if isinstance(content, six.string_types):
return content
- elif isinstance(content, six.integer_types+(float,)):
+ elif isinstance(content, six.integer_types + (float,)):
# Databricks can tolerate either numeric or string types in the API backend.
return str(content)
elif isinstance(content, (list, tuple)):
@@ -221,8 +222,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
for k, v in list(content.items())}
else:
param_type = type(content)
- msg = 'Type {0} used for parameter {1} is not a number or a string' \
- .format(param_type, json_path)
+ msg = 'Type {0} used for parameter {1} is not a number or a string'\
+ .format(param_type, json_path)
raise AirflowException(msg)
def _log_run_page_url(self, url):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 11adc60..e3c8c1f 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 22db3e6..4973eb1 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -249,13 +249,12 @@ class DataprocClusterCreateOperator(BaseOperator):
self.project_id, self.zone
)
master_type_uri = \
- "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
- self.project_id, self.zone, self.master_machine_type
- )
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\
+ .format(self.project_id, self.zone, self.master_machine_type)
worker_type_uri = \
- "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
- self.project_id, self.zone, self.worker_machine_type
- )
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\
+ .format(self.project_id, self.zone, self.worker_machine_type)
+
cluster_data = {
'projectId': self.project_id,
'clusterName': self.cluster_name,
@@ -297,7 +296,7 @@ class DataprocClusterCreateOperator(BaseOperator):
# [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows
# semantic versioning spec: x.y.z).
cluster_data['labels'].update({'airflow-version':
- 'v' + version.replace('.', '-').replace('+','-')})
+ 'v' + version.replace('.', '-').replace('+', '-')})
if self.storage_bucket:
cluster_data['config']['configBucket'] = self.storage_bucket
if self.metadata:
@@ -305,7 +304,8 @@ class DataprocClusterCreateOperator(BaseOperator):
if self.network_uri:
cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri
if self.subnetwork_uri:
- cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri
+ cluster_data['config']['gceClusterConfig']['subnetworkUri'] = \
+ self.subnetwork_uri
if self.tags:
cluster_data['config']['gceClusterConfig']['tags'] = self.tags
if self.image_version:
@@ -332,10 +332,10 @@ class DataprocClusterCreateOperator(BaseOperator):
cluster_data['config']['initializationActions'] = init_actions_dict
if self.service_account:
cluster_data['config']['gceClusterConfig']['serviceAccount'] =\
- self.service_account
+ self.service_account
if self.service_account_scopes:
cluster_data['config']['gceClusterConfig']['serviceAccountScopes'] =\
- self.service_account_scopes
+ self.service_account_scopes
return cluster_data
def execute(self, context):
@@ -368,7 +368,7 @@ class DataprocClusterCreateOperator(BaseOperator):
self.log.info(
'Cluster {} already exists... Checking status...',
self.cluster_name
- )
+ )
self._wait_for_done(service)
return True
else:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 09b7965..f6dc7cc 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,15 +35,16 @@ class DatastoreExportOperator(BaseOperator):
:type namespace: str
:param datastore_conn_id: the name of the Datastore connection id to use
:type datastore_conn_id: string
- :param cloud_storage_conn_id: the name of the cloud storage connection id to force-write
- backup
+ :param cloud_storage_conn_id: the name of the cloud storage connection id to
+ force-write backup
:type cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
- :param entity_filter: description of what data from the project is included in the export,
- refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
+ :param entity_filter: description of what data from the project is included in the
+ export, refer to
+ https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
:type entity_filter: dict
:param labels: client-assigned labels for cloud storage
:type labels: dict
@@ -92,7 +93,7 @@ class DatastoreExportOperator(BaseOperator):
for o in objects:
gcs_hook.delete(self.bucket, o)
- ds_hook = DatastoreHook(self.datastore_conn_id,self.delegate_to)
+ ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
result = ds_hook.export_to_storage_bucket(bucket=self.bucket,
namespace=self.namespace,
entity_filter=self.entity_filter,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 279b1a5..401d36e 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,9 @@ class DatastoreImportOperator(BaseOperator):
:param namespace: optional namespace of the backup metadata file in
the specified Cloud Storage bucket.
:type namespace: str
- :param entity_filter: description of what data from the project is included in the export,
- refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
+ :param entity_filter: description of what data from the project is included in
+ the export, refer to
+ https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
:type entity_filter: dict
:param labels: client-assigned labels for cloud storage
:type labels: dict
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/discord_webhook_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/discord_webhook_operator.py b/airflow/contrib/operators/discord_webhook_operator.py
index acfba58..6b44eac 100644
--- a/airflow/contrib/operators/discord_webhook_operator.py
+++ b/airflow/contrib/operators/discord_webhook_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py
index c0cb09d..426393d 100644
--- a/airflow/contrib/operators/druid_operator.py
+++ b/airflow/contrib/operators/druid_operator.py
@@ -29,18 +29,17 @@ class DruidOperator(BaseOperator):
:param json_index_file: The filepath to the druid index specification
:type json_index_file: str
- :param druid_ingest_conn_id: The connection id of the Druid overlord which accepts index jobs
+ :param druid_ingest_conn_id: The connection id of the Druid overlord which
+ accepts index jobs
:type druid_ingest_conn_id: str
"""
template_fields = ('index_spec_str',)
template_ext = ('.json',)
- def __init__(
- self,
- json_index_file,
- druid_ingest_conn_id='druid_ingest_default',
- max_ingestion_time=None,
- *args, **kwargs):
+ def __init__(self, json_index_file,
+ druid_ingest_conn_id='druid_ingest_default',
+ max_ingestion_time=None,
+ *args, **kwargs):
super(DruidOperator, self).__init__(*args, **kwargs)
self.conn_id = druid_ingest_conn_id
self.max_ingestion_time = max_ingestion_time
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 1016fca..60540f5 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,9 +37,11 @@ class ECSOperator(BaseOperator):
http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
:type: overrides: dict
:param aws_conn_id: connection id of AWS credentials / region name. If None,
- credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
+ credential boto3 strategy will be used
+ (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
:type aws_conn_id: str
- :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided)
+ :param region_name: region name to use in AWS Hook.
+ Override the region_name in connection (if provided)
:param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE')
:type: launch_type: str
"""
@@ -66,7 +68,7 @@ class ECSOperator(BaseOperator):
def execute(self, context):
self.log.info(
'Running ECS Task - Task definition: %s - on cluster %s',
- self.task_definition,self.cluster
+ self.task_definition, self.cluster
)
self.log.info('ECSOperator overrides: %s', self.overrides)
@@ -115,13 +117,16 @@ class ECSOperator(BaseOperator):
for task in response['tasks']:
containers = task['containers']
for container in containers:
- if container.get('lastStatus') == 'STOPPED' and container['exitCode'] != 0:
- raise AirflowException('This task is not in success state {}'.format(task))
+ if container.get('lastStatus') == 'STOPPED' and \
+ container['exitCode'] != 0:
+ raise AirflowException(
+ 'This task is not in success state {}'.format(task))
elif container.get('lastStatus') == 'PENDING':
raise AirflowException('This task is still pending {}'.format(task))
elif 'error' in container.get('reason', '').lower():
- raise AirflowException('This containers encounter an error during launching : {}'.
- format(container.get('reason', '').lower()))
+ raise AirflowException(
+ 'This containers encounter an error during launching : {}'.
+ format(container.get('reason', '').lower()))
def get_hook(self):
return AwsHook(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index 3d2d510..643ffe9 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index e90c77a..89be12f 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,8 @@ from airflow.exceptions import AirflowException
class EmrCreateJobFlowOperator(BaseOperator):
"""
Creates an EMR JobFlow, reading the config from the EMR connection.
- A dictionary of JobFlow overrides can be passed that override the config from the connection.
+ A dictionary of JobFlow overrides can be passed that override
+ the config from the connection.
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index 95c3164..50407a1 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py
index b65e3b3..807385b 100644
--- a/airflow/contrib/operators/file_to_gcs.py
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,8 +65,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
Uploads the file to Google cloud storage
"""
hook = GoogleCloudStorageHook(
- google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
- delegate_to=self.delegate_to)
+ google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+ delegate_to=self.delegate_to)
hook.upload(
bucket=self.bucket,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index f55c0c1..797960c 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -57,6 +57,8 @@ class FileToWasbOperator(BaseOperator):
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
self.log.info(
- 'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
+ 'Uploading {self.file_path} to wasb://{self.container_name} '
+ 'as {self.blob_name}'.format(**locals())
)
- hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
+ hook.load_file(self.file_path, self.container_name,
+ self.blob_name, **self.load_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 7dfa96c..ce272ae 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,7 +46,8 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
connecting to Google cloud storage.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide delegation enabled.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
:type delegate_to: string
"""
template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',)
@@ -84,5 +85,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
if sys.getsizeof(file_bytes) < 48000:
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
- raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
+ raise RuntimeError(
+ 'The size of the downloaded file is too large to push to XCom!'
+ )
self.log.debug(file_bytes)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py
index 393deb9..6474453 100644
--- a/airflow/contrib/operators/gcs_list_operator.py
+++ b/airflow/contrib/operators/gcs_list_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_operator.py b/airflow/contrib/operators/gcs_operator.py
index 3d339d8..ef5e8de 100644
--- a/airflow/contrib/operators/gcs_operator.py
+++ b/airflow/contrib/operators/gcs_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 938da4e..3a77980 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -184,8 +184,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
- if not self.schema_fields and self.schema_object \
- and self.source_format != 'DATASTORE_BACKUP':
+ if not self.schema_fields and \
+ self.schema_object and \
+ self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_s3.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py
index 00c5e14..a87aa3a 100644
--- a/airflow/contrib/operators/gcs_to_s3.py
+++ b/airflow/contrib/operators/gcs_to_s3.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 5dd06f6..381cd72 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hive_to_dynamodb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hive_to_dynamodb.py b/airflow/contrib/operators/hive_to_dynamodb.py
index 2b6f88f..4a39e40 100644
--- a/airflow/contrib/operators/hive_to_dynamodb.py
+++ b/airflow/contrib/operators/hive_to_dynamodb.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jenkins_job_trigger_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jenkins_job_trigger_operator.py b/airflow/contrib/operators/jenkins_job_trigger_operator.py
index ad59797..37185f3 100644
--- a/airflow/contrib/operators/jenkins_job_trigger_operator.py
+++ b/airflow/contrib/operators/jenkins_job_trigger_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jira_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/jira_operator.py b/airflow/contrib/operators/jira_operator.py
index 64869e4..01d78b8 100644
--- a/airflow/contrib/operators/jira_operator.py
+++ b/airflow/contrib/operators/jira_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 391ec58..8e88b68 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -29,6 +29,7 @@ template_fields = ('templates_dict',)
template_ext = tuple()
ui_color = '#ffefeb'
+
class KubernetesPodOperator(BaseOperator):
"""
Execute a task in a Kubernetes Pod
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/mlengine_operator_utils.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator_utils.py b/airflow/contrib/operators/mlengine_operator_utils.py
index 5fda6ae..7ce784e 100644
--- a/airflow/contrib/operators/mlengine_operator_utils.py
+++ b/airflow/contrib/operators/mlengine_operator_utils.py
@@ -28,6 +28,7 @@ from airflow.exceptions import AirflowException
from airflow.operators.python_operator import PythonOperator
from six.moves.urllib.parse import urlsplit
+
def create_evaluate_ops(task_prefix,
data_format,
input_paths,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/postgres_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py
index 9eac70d..88b4d00 100644
--- a/airflow/contrib/operators/postgres_to_gcs_operator.py
+++ b/airflow/contrib/operators/postgres_to_gcs_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/pubsub_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py
index 42586aa..2d55b19 100644
--- a/airflow/contrib/operators/pubsub_operator.py
+++ b/airflow/contrib/operators/pubsub_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 47ccf53..88e8bdf 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/s3_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py
index df448ad..b85691b 100644
--- a/airflow/contrib/operators/s3_list_operator.py
+++ b/airflow/contrib/operators/s3_list_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index 519dc1d..153f440 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,8 @@ class SFTPOperator(BaseOperator):
self.local_filepath = local_filepath
self.remote_filepath = remote_filepath
self.operation = operation
- if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+ if not (self.operation.lower() == SFTPOperation.GET or
+ self.operation.lower() == SFTPOperation.PUT):
raise TypeError("unsupported operation value {0}, expected {1} or {2}"
.format(self.operation, SFTPOperation.GET, SFTPOperation.PUT))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/slack_webhook_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/slack_webhook_operator.py b/airflow/contrib/operators/slack_webhook_operator.py
index 124b249..bab3e90 100644
--- a/airflow/contrib/operators/slack_webhook_operator.py
+++ b/airflow/contrib/operators/slack_webhook_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/snowflake_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/snowflake_operator.py b/airflow/contrib/operators/snowflake_operator.py
index 25e9bf4..39d7d49 100644
--- a/airflow/contrib/operators/snowflake_operator.py
+++ b/airflow/contrib/operators/snowflake_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_jdbc_operator.py b/airflow/contrib/operators/spark_jdbc_operator.py
index c915aaa..42f9dd5 100644
--- a/airflow/contrib/operators/spark_jdbc_operator.py
+++ b/airflow/contrib/operators/spark_jdbc_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 864c137..825a367 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,11 @@ class SparkSqlOperator(BaseOperator):
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
- :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+ :param total_executor_cores: (Standalone & Mesos only) Total cores for all
+ executors (Default: all the available cores on the worker)
:type total_executor_cores: int
- :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
+ :param executor_cores: (Standalone & YARN only) Number of cores per
+ executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index 05996d8..fa61ca1 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,12 +33,16 @@ from airflow.utils.decorators import apply_defaults
class SqoopOperator(BaseOperator):
"""
Execute a Sqoop job.
- Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
+ Documentation for Apache Sqoop can be found here:
+ https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
"""
- template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type', 'columns', 'split_by',
- 'where', 'export_dir', 'input_null_string', 'input_null_non_string', 'staging_table',
- 'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by',
- 'input_optionally_enclosed_by', 'properties', 'extra_import_options', 'driver',
+ template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir',
+ 'file_type', 'columns', 'split_by',
+ 'where', 'export_dir', 'input_null_string',
+ 'input_null_non_string', 'staging_table',
+ 'enclosed_by', 'escaped_by', 'input_fields_terminated_by',
+ 'input_lines_terminated_by', 'input_optionally_enclosed_by',
+ 'properties', 'extra_import_options', 'driver',
'extra_export_options', 'hcatalog_database', 'hcatalog_table',)
ui_color = '#7D8CA4'
@@ -115,7 +119,8 @@ class SqoopOperator(BaseOperator):
:param relaxed_isolation: use read uncommitted isolation level
:param hcatalog_database: Specifies the database name for the HCatalog table
:param hcatalog_table: The argument value for this option is the HCatalog table
- :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not
+ :param create_hcatalog_table: Have sqoop create the hcatalog table passed
+ in or not
:param properties: additional JVM properties passed to sqoop
:param extra_import_options: Extra import options to pass as dict.
If a key doesn't have a value, just pass an empty string to it.
@@ -189,7 +194,8 @@ class SqoopOperator(BaseOperator):
extra_export_options=self.extra_export_options)
elif self.cmd_type == 'import':
# add create hcatalog table to extra import options if option passed
- # if new params are added to constructor can pass them in here so don't modify sqoop_hook for each param
+ # if new params are added to constructor can pass them in here
+ # so don't modify sqoop_hook for each param
if self.create_hcatalog_table:
self.extra_import_options['create-hcatalog-table'] = ''
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index b5ab2dc..d246800 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -109,7 +109,9 @@ class SSHOperator(BaseOperator):
agg_stdout += stdout.channel.recv(stdout_buffer_length)
# read from both stdout and stderr
- while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
+ while not channel.closed or \
+ channel.recv_ready() or \
+ channel.recv_stderr_ready():
readq, _, _ = select([channel], [], [], self.timeout)
for c in readq:
if c.recv_ready():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index c2a6efb..41072ff 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 40166fa..00fdb64 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -94,7 +94,9 @@ class VerticaToHiveTransfer(BaseOperator):
@classmethod
def type_map(cls, vertica_type):
# vertica-python datatype.py donot provied the full type mapping access.
- # Manual hack. Reference: https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
+ # Manual hack.
+ # Reference:
+ # https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
d = {
5: 'BOOLEAN',
6: 'INT',
@@ -120,7 +122,8 @@ class VerticaToHiveTransfer(BaseOperator):
for field in cursor.description:
col_count += 1
col_position = "Column{position}".format(position=col_count)
- field_dict[col_position if field[0] == '' else field[0]] = self.type_map(field[1])
+ field_dict[col_position if field[0] == '' else field[0]] = \
+ self.type_map(field[1])
csv_writer.writerows(cursor.iterate())
f.flush()
cursor.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py
index 22af0ce..9d3fe72 100644
--- a/airflow/contrib/operators/vertica_to_mysql.py
+++ b/airflow/contrib/operators/vertica_to_mysql.py
@@ -18,6 +18,7 @@
# under the License.
import logging
+import MySQLdb
from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.hooks.mysql_hook import MySqlHook
@@ -60,7 +61,7 @@ class VerticaToMySqlTransfer(BaseOperator):
"""
template_fields = ('sql', 'mysql_table', 'mysql_preoperator',
- 'mysql_postoperator')
+ 'mysql_postoperator')
template_ext = ('.sql',)
ui_color = '#a0e08c'
@@ -102,7 +103,9 @@ class VerticaToMySqlTransfer(BaseOperator):
if self.bulk_load:
tmpfile = NamedTemporaryFile("w")
- logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...")
+ logging.info(
+ "Selecting rows from Vertica to local file " + str(
+ tmpfile.name) + "...")
logging.info(self.sql)
csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8')
@@ -129,14 +132,20 @@ class VerticaToMySqlTransfer(BaseOperator):
logging.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
- cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns)))
+ cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO "
+ "TABLE %s LINES TERMINATED BY '\r\n' (%s)" %
+ (tmpfile.name,
+ self.mysql_table,
+ ", ".join(selected_columns)))
conn.commit()
tmpfile.close()
else:
logging.info("Inserting rows into MySQL...")
- mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns)
+ mysql.insert_rows(table=self.mysql_table,
+ rows=result,
+ target_fields=selected_columns)
logging.info("Inserted rows into MySQL " + str(count))
- except:
+ except (MySQLdb.Error, MySQLdb.Warning):
logging.error("Inserted rows into MySQL 0")
raise
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/plugins/metastore_browser/main.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/plugins/metastore_browser/main.py b/airflow/contrib/plugins/metastore_browser/main.py
index 9a47ef5..836e531 100644
--- a/airflow/contrib/plugins/metastore_browser/main.py
+++ b/airflow/contrib/plugins/metastore_browser/main.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -138,7 +138,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin):
""".format(where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT)
h = MySqlHook(METASTORE_MYSQL_CONN_ID)
d = [
- {'id': row[0], 'text': row[0]}
+ {'id': row[0], 'text': row[0]}
for row in h.get_records(sql)]
return json.dumps(d)
@@ -161,6 +161,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin):
h = HiveCliHook(HIVE_CLI_CONN_ID)
return h.run_cli(sql)
+
v = MetastoreBrowserView(category="Plugins", name="Hive Metadata Browser")
# Creating a flask blueprint to intergrate the templates and static folder
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/__init__.py b/airflow/contrib/sensors/__init__.py
index 4067cc7..114d189 100644
--- a/airflow/contrib/sensors/__init__.py
+++ b/airflow/contrib/sensors/__init__.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 3b130af..6709524 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
index 965c13c..26fbb06 100644
--- a/airflow/contrib/sensors/bash_sensor.py
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -66,7 +66,6 @@ class BashSensor(BaseSensorOperator):
self.log.info("Tmp dir root location: \n %s", gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
-
f.write(bytes(bash_command, 'utf_8'))
f.flush()
fname = f.name
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index fe75548..2e496f6 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,37 +24,36 @@ from airflow.utils.decorators import apply_defaults
class BigQueryTableSensor(BaseSensorOperator):
"""
Checks for the existence of a table in Google Bigquery.
- """
- template_fields = ('project_id', 'dataset_id', 'table_id',)
- ui_color = '#f0eee4'
-
- @apply_defaults
- def __init__(
- self,
- project_id,
- dataset_id,
- table_id,
- bigquery_conn_id='bigquery_default_conn',
- delegate_to=None,
- *args,
- **kwargs):
- """
- Create a new BigQueryTableSensor.
- :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
- must provide access to the specified project.
+ :param project_id: The Google cloud project in which to look for the table.
+ The connection supplied to the hook must provide
+ access to the specified project.
:type project_id: string
:param dataset_id: The name of the dataset in which to look for the table.
storage bucket.
:type dataset_id: string
:param table_id: The name of the table to check the existence of.
:type table_id: string
- :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery.
+ :param bigquery_conn_id: The connection ID to use when connecting to
+ Google BigQuery.
:type bigquery_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide delegation enabled.
+ For this to work, the service account making the request must
+ have domain-wide delegation enabled.
:type delegate_to: string
- """
+ """
+ template_fields = ('project_id', 'dataset_id', 'table_id',)
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ project_id,
+ dataset_id,
+ table_id,
+ bigquery_conn_id='bigquery_default_conn',
+ delegate_to=None,
+ *args, **kwargs):
+
super(BigQueryTableSensor, self).__init__(*args, **kwargs)
self.project_id = project_id
self.dataset_id = dataset_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index 9dad505..fb455d6 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index edb420e..abeb421 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index c33dbad..31d16a0 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,8 @@ class EmrJobFlowSensor(EmrBaseSensor):
:type job_flow_id: string
"""
- NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING']
+ NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING',
+ 'WAITING', 'TERMINATING']
FAILED_STATE = ['TERMINATED_WITH_ERRORS']
template_fields = ['job_flow_id']
template_ext = ()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index bd76969..3dddf01 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/file_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py
index 2e3a3fd..3f7bb24 100644
--- a/airflow/contrib/sensors/file_sensor.py
+++ b/airflow/contrib/sensors/file_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index efdedd7..f0a4928 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 036d4f0..23cd760 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,21 +24,7 @@ from airflow.utils.decorators import apply_defaults
class GoogleCloudStorageObjectSensor(BaseSensorOperator):
"""
Checks for the existence of a file in Google Cloud Storage.
- """
- template_fields = ('bucket', 'object')
- ui_color = '#f0eee4'
-
- @apply_defaults
- def __init__(
- self,
- bucket,
- object, # pylint:disable=redefined-builtin
- google_cloud_conn_id='google_cloud_default',
- delegate_to=None,
- *args,
- **kwargs):
- """
- Create a new GoogleCloudStorageObjectSensor.
+ Create a new GoogleCloudStorageObjectSensor.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
@@ -49,9 +35,21 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
connecting to Google cloud storage.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide delegation enabled.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
:type delegate_to: string
- """
+ """
+ template_fields = ('bucket', 'object')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ bucket,
+ object, # pylint:disable=redefined-builtin
+ google_cloud_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args, **kwargs):
+
super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs)
self.bucket = bucket
self.object = object
@@ -78,23 +76,7 @@ def ts_function(context):
class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
"""
Checks if an object is updated in Google Cloud Storage.
- """
- template_fields = ('bucket', 'object')
- template_ext = ('.sql',)
- ui_color = '#f0eee4'
-
- @apply_defaults
- def __init__(
- self,
- bucket,
- object, # pylint:disable=redefined-builtin
- ts_func=ts_function,
- google_cloud_conn_id='google_cloud_default',
- delegate_to=None,
- *args,
- **kwargs):
- """
- Create a new GoogleCloudStorageObjectUpdatedSensor.
+ Create a new GoogleCloudStorageObjectUpdatedSensor.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
@@ -112,7 +94,20 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
- """
+ """
+ template_fields = ('bucket', 'object')
+ template_ext = ('.sql',)
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ bucket,
+ object, # pylint:disable=redefined-builtin
+ ts_func=ts_function,
+ google_cloud_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args, **kwargs):
+
super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs)
self.bucket = bucket
self.object = object
@@ -131,21 +126,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
"""
Checks for the existence of a files at prefix in Google Cloud Storage bucket.
- """
- template_fields = ('bucket', 'prefix')
- ui_color = '#f0eee4'
-
- @apply_defaults
- def __init__(
- self,
- bucket,
- prefix,
- google_cloud_conn_id='google_cloud_default',
- delegate_to=None,
- *args,
- **kwargs):
- """
- Create a new GoogleCloudStorageObjectSensor.
+ Create a new GoogleCloudStorageObjectSensor.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
@@ -156,9 +137,20 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
connecting to Google cloud storage.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide delegation enabled.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
:type delegate_to: string
- """
+ """
+ template_fields = ('bucket', 'prefix')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ bucket,
+ prefix,
+ google_cloud_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args, **kwargs):
super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs)
self.bucket = bucket
self.prefix = prefix
@@ -166,7 +158,8 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
self.delegate_to = delegate_to
def poke(self, context):
- self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix)
+ self.log.info('Sensor checks existence of objects: %s, %s',
+ self.bucket, self.prefix)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py
index 5193add..832b81b 100644
--- a/airflow/contrib/sensors/hdfs_sensor.py
+++ b/airflow/contrib/sensors/hdfs_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 6f73d19..0583c22 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -136,12 +136,16 @@ class JiraTicketSensor(JiraSensor):
)
except JIRAError as jira_error:
- self.log.error("Jira error while checking with expected value: %s", jira_error)
+ self.log.error("Jira error while checking with expected value: %s",
+ jira_error)
except Exception as e:
- self.log.error("Error while checking with expected value %s:", self.expected_value)
+ self.log.error("Error while checking with expected value %s:",
+ self.expected_value)
self.log.exception(e)
if result is True:
- self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
+ self.log.info("Issue field %s has expected value %s, returning success",
+ self.field, self.expected_value)
else:
- self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
+ self.log.info("Issue field %s don't have expected value %s yet.",
+ self.field, self.expected_value)
return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/pubsub_sensor.py b/airflow/contrib/sensors/pubsub_sensor.py
index c6b8467..7d17215 100644
--- a/airflow/contrib/sensors/pubsub_sensor.py
+++ b/airflow/contrib/sensors/pubsub_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 0ecd34a..baf3e16 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/sftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/sftp_sensor.py b/airflow/contrib/sensors/sftp_sensor.py
index 08559e9..801943a 100644
--- a/airflow/contrib/sensors/sftp_sensor.py
+++ b/airflow/contrib/sensors/sftp_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index c24f626..ec6a63b 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/__init__.py b/airflow/contrib/task_runner/__init__.py
index 4067cc7..114d189 100644
--- a/airflow/contrib/task_runner/__init__.py
+++ b/airflow/contrib/task_runner/__init__.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY