You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/13 07:37:32 UTC
[4/5] incubator-airflow git commit: [AIRFLOW-1582] Improve logging
within Airflow
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index 67d1605..f2b5fef 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -24,14 +24,15 @@ NOTE: this hook also relies on the simple_salesforce package:
from simple_salesforce import Salesforce
from airflow.hooks.base_hook import BaseHook
-import logging
import json
import pandas as pd
import time
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class SalesforceHook(BaseHook):
+
+class SalesforceHook(BaseHook, LoggingMixin):
def __init__(
self,
conn_id,
@@ -91,13 +92,12 @@ class SalesforceHook(BaseHook):
"""
self.sign_in()
- logging.info("Querying for all objects")
+ self.logger.info("Querying for all objects")
query = self.sf.query_all(query)
- logging.info(
- "Received results: Total size: {0}; Done: {1}".format(
- query['totalSize'], query['done']
- )
+ self.logger.info(
+ "Received results: Total size: %s; Done: %s",
+ query['totalSize'], query['done']
)
query = json.loads(json.dumps(query))
@@ -144,11 +144,9 @@ class SalesforceHook(BaseHook):
field_string = self._build_field_list(fields)
query = "SELECT {0} FROM {1}".format(field_string, obj)
- logging.info(
- "Making query to salesforce: {0}".format(
- query if len(query) < 30
- else " ... ".join([query[:15], query[-15:]])
- )
+ self.logger.info(
+ "Making query to Salesforce: %s",
+ query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
)
return self.make_query(query)
@@ -171,8 +169,9 @@ class SalesforceHook(BaseHook):
try:
col = pd.to_datetime(col)
except ValueError:
- logging.warning(
- "Could not convert field to timestamps: {0}".format(col.name)
+ log = LoggingMixin().logger
+ log.warning(
+ "Could not convert field to timestamps: %s", col.name
)
return col
@@ -266,7 +265,7 @@ class SalesforceHook(BaseHook):
# for each returned record
object_name = query_results[0]['attributes']['type']
- logging.info("Coercing timestamps for: {0}".format(object_name))
+ self.logger.info("Coercing timestamps for: %s", object_name)
schema = self.describe_object(object_name)
@@ -300,7 +299,7 @@ class SalesforceHook(BaseHook):
# there are also a ton of newline objects
# that mess up our ability to write to csv
# we remove these newlines so that the output is a valid CSV format
- logging.info("Cleaning data and writing to CSV")
+ self.logger.info("Cleaning data and writing to CSV")
possible_strings = df.columns[df.dtypes == "object"]
df[possible_strings] = df[possible_strings].apply(
lambda x: x.str.replace("\r\n", "")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index d7bef7b..aa16130 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -12,16 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import logging
import subprocess
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
-log = logging.getLogger(__name__)
-
-class SparkSqlHook(BaseHook):
+class SparkSqlHook(BaseHook, LoggingMixin):
"""
This hook is a wrapper around the spark-sql binary. It requires that the
"spark-sql" binary is in the PATH.
@@ -123,7 +121,7 @@ class SparkSqlHook(BaseHook):
connection_cmd += ["--queue", self._yarn_queue]
connection_cmd += cmd
- logging.debug("Spark-Sql cmd: {}".format(connection_cmd))
+ self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
return connection_cmd
@@ -153,5 +151,5 @@ class SparkSqlHook(BaseHook):
def kill(self):
if self._sp and self._sp.poll() is None:
- logging.info("Killing the Spark-Sql job")
+ self.logger.info("Killing the Spark-Sql job")
self._sp.kill()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index a667753..bdd1efe 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -12,18 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import logging
import os
import subprocess
import re
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
-log = logging.getLogger(__name__)
-
-class SparkSubmitHook(BaseHook):
+class SparkSubmitHook(BaseHook, LoggingMixin):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH or the spark_home to be
@@ -63,7 +61,6 @@ class SparkSubmitHook(BaseHook):
:param verbose: Whether to pass the verbose flag to spark-submit process for debugging
:type verbose: bool
"""
-
def __init__(self,
conf=None,
conn_id='spark_default',
@@ -126,10 +123,9 @@ class SparkSubmitHook(BaseHook):
conn_data['spark_home'] = extra.get('spark-home', None)
conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
except AirflowException:
- logging.debug(
- "Could not load connection string {}, defaulting to {}".format(
- self._conn_id, conn_data['master']
- )
+ self.logger.debug(
+ "Could not load connection string %s, defaulting to %s",
+ self._conn_id, conn_data['master']
)
return conn_data
@@ -196,7 +192,7 @@ class SparkSubmitHook(BaseHook):
if self._application_args:
connection_cmd += self._application_args
- logging.debug("Spark-Submit cmd: {}".format(connection_cmd))
+ self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
return connection_cmd
@@ -243,15 +239,15 @@ class SparkSubmitHook(BaseHook):
self._yarn_application_id = match.groups()[0]
# Pass to logging
- logging.info(line)
+ self.logger.info(line)
def on_kill(self):
if self._sp and self._sp.poll() is None:
- logging.info('Sending kill signal to {}'.format(self._connection['spark_binary']))
+ self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
self._sp.kill()
if self._yarn_application_id:
- logging.info('Killing application on YARN')
+ self.logger.info('Killing application on YARN')
kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- logging.info("YARN killed with return code: {0}".format(yarn_kill.wait()))
+ self.logger.info("YARN killed with return code: %s", yarn_kill.wait())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 7fbb6c5..0584df4 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -16,17 +16,14 @@
"""
This module contains a sqoop 1.x hook
"""
-
-import logging
import subprocess
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
-
-log = logging.getLogger(__name__)
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class SqoopHook(BaseHook):
+class SqoopHook(BaseHook, LoggingMixin):
"""
This hook is a wrapper around the sqoop 1 binary. To be able to use the hook
it is required that "sqoop" is in the PATH.
@@ -79,7 +76,7 @@ class SqoopHook(BaseHook):
password_index = cmd.index('--password')
cmd[password_index + 1] = 'MASKED'
except ValueError:
- logging.debug("No password in sqoop cmd")
+ self.logger.debug("No password in sqoop cmd")
return cmd
def Popen(self, cmd, **kwargs):
@@ -90,21 +87,21 @@ class SqoopHook(BaseHook):
:param kwargs: extra arguments to Popen (see subprocess.Popen)
:return: handle to subprocess
"""
- logging.info("Executing command: {}".format(' '.join(cmd)))
+ self.logger.info("Executing command: %s", ' '.join(cmd))
sp = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
for line in iter(sp.stdout):
- logging.info(line.strip())
+ self.logger.info(line.strip())
sp.wait()
- logging.info("Command exited with return code {0}".format(sp.returncode))
+ self.logger.info("Command exited with return code %s", sp.returncode)
if sp.returncode:
- raise AirflowException("Sqoop command failed: {}".format(' '.join(cmd)))
+ raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
def _prepare_command(self, export=False):
if export:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index f1e25a6..3fe9146 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -16,7 +16,6 @@
# limitations under the License.
import getpass
-import logging
import os
import paramiko
@@ -24,9 +23,10 @@ import paramiko
from contextlib import contextmanager
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
-class SSHHook(BaseHook):
+class SSHHook(BaseHook, LoggingMixin):
"""
Hook for ssh remote execution using Paramiko.
ref: https://github.com/paramiko/paramiko
@@ -70,7 +70,7 @@ class SSHHook(BaseHook):
def get_conn(self):
if not self.client:
- logging.debug('creating ssh client for conn_id: {0}'.format(self.ssh_conn_id))
+ self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if self.username is None:
@@ -98,9 +98,11 @@ class SSHHook(BaseHook):
# Auto detecting username values from system
if not self.username:
- logging.debug("username to ssh to host: {0} is not specified, using "
- "system's default provided by getpass.getuser()"
- .format(self.remote_host, self.ssh_conn_id))
+ self.logger.debug(
+ "username to ssh to host: %s is not specified for connection id"
+ " %s. Using system's default provided by getpass.getuser()",
+ self.remote_host, self.ssh_conn_id
+ )
self.username = getpass.getuser()
host_proxy = None
@@ -140,14 +142,20 @@ class SSHHook(BaseHook):
self.client = client
except paramiko.AuthenticationException as auth_error:
- logging.error("Auth failed while connecting to host: {0}, error: {1}"
- .format(self.remote_host, auth_error))
+ self.logger.error(
+ "Auth failed while connecting to host: %s, error: %s",
+ self.remote_host, auth_error
+ )
except paramiko.SSHException as ssh_error:
- logging.error("Failed connecting to host: {0}, error: {1}"
- .format(self.remote_host, ssh_error))
+ self.logger.error(
+ "Failed connecting to host: %s, error: %s",
+ self.remote_host, ssh_error
+ )
except Exception as error:
- logging.error("Error connecting to host: {0}, error: {1}"
- .format(self.remote_host, error))
+ self.logger.error(
+ "Error connecting to host: %s, error: %s",
+ self.remote_host, error
+ )
return self.client
@contextmanager
@@ -183,7 +191,7 @@ class SSHHook(BaseHook):
]
ssh_cmd += ssh_tunnel_cmd
- logging.debug("creating tunnel with cmd: {0}".format(ssh_cmd))
+ self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
proc = subprocess.Popen(ssh_cmd,
stdin=subprocess.PIPE,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 3b804a8..37e4a97 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -89,7 +87,7 @@ class BigQueryOperator(BaseOperator):
self.query_params = query_params
def execute(self, context):
- logging.info('Executing: %s', self.bql)
+ self.logger.info('Executing: %s', self.bql)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 643f5ac..21de7cc 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -55,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
self.ignore_if_missing = ignore_if_missing
def execute(self, context):
- logging.info('Deleting: %s', self.deletion_dataset_table)
+ self.logger.info('Deleting: %s', self.deletion_dataset_table)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 6f4843c..8e21270 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -70,8 +68,10 @@ class BigQueryToBigQueryOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- logging.info('Executing copy of %s into: %s', self.source_project_dataset_tables,
- self.destination_project_dataset_table)
+ self.logger.info(
+ 'Executing copy of %s into: %s',
+ self.source_project_dataset_tables, self.destination_project_dataset_table
+ )
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index aaff462..23a2029 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -81,7 +79,7 @@ class BigQueryToCloudStorageOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- logging.info('Executing extract of %s into: %s',
+ self.logger.info('Executing extract of %s into: %s',
self.source_project_dataset_table,
self.destination_cloud_storage_uris)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 1aa1441..8773357 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -13,7 +13,6 @@
# limitations under the License.
#
-import logging
import six
import time
@@ -21,8 +20,6 @@ from airflow.exceptions import AirflowException
from airflow.contrib.hooks.databricks_hook import DatabricksHook
from airflow.models import BaseOperator
-LINE_BREAK = ('-' * 80)
-
class DatabricksSubmitRunOperator(BaseOperator):
"""
@@ -217,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
raise AirflowException(msg)
def _log_run_page_url(self, url):
- logging.info('View run status, Spark UI, and logs at {}'.format(url))
+ self.logger.info('View run status, Spark UI, and logs at %s', url)
def get_hook(self):
return DatabricksHook(
@@ -228,16 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
hook = self.get_hook()
self.run_id = hook.submit_run(self.json)
run_page_url = hook.get_run_page_url(self.run_id)
- logging.info(LINE_BREAK)
- logging.info('Run submitted with run_id: {}'.format(self.run_id))
+ self.logger.info('Run submitted with run_id: %s', self.run_id)
self._log_run_page_url(run_page_url)
- logging.info(LINE_BREAK)
while True:
run_state = hook.get_run_state(self.run_id)
if run_state.is_terminal:
if run_state.is_successful:
- logging.info('{} completed successfully.'.format(
- self.task_id))
+ self.logger.info('%s completed successfully.', self.task_id)
self._log_run_page_url(run_page_url)
return
else:
@@ -246,16 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
s=run_state)
raise AirflowException(error_message)
else:
- logging.info('{t} in run state: {s}'.format(t=self.task_id,
- s=run_state))
+ self.logger.info('%s in run state: %s', self.task_id, run_state)
self._log_run_page_url(run_page_url)
- logging.info('Sleeping for {} seconds.'.format(
- self.polling_period_seconds))
+ self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
time.sleep(self.polling_period_seconds)
def on_kill(self):
hook = self.get_hook()
hook.cancel_run(self.run_id)
- logging.info('Task: {t} with run_id: {r} was requested to be cancelled.'.format(
- t=self.task_id,
- r=self.run_id))
+ self.logger.info(
+ 'Task: %s with run_id: %s was requested to be cancelled.',
+ self.task_id, self.run_id
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index c0ff6a7..3c22b60 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -13,7 +13,6 @@
# limitations under the License.
#
-import logging
import time
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
@@ -178,13 +177,14 @@ class DataprocClusterCreateOperator(BaseOperator):
while True:
state = self._get_cluster_state(service)
if state is None:
- logging.info("No state for cluster '%s'", self.cluster_name)
+ self.logger.info("No state for cluster '%s'", self.cluster_name)
time.sleep(15)
else:
- logging.info("State for cluster '%s' is %s", self.cluster_name, state)
+ self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
if self._cluster_ready(state, service):
- logging.info("Cluster '%s' successfully created",
- self.cluster_name)
+ self.logger.info(
+ "Cluster '%s' successfully created", self.cluster_name
+ )
return
time.sleep(15)
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
return cluster_data
def execute(self, context):
- logging.info('Creating cluster: {}'.format(self.cluster_name))
+ self.logger.info('Creating cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -272,9 +272,10 @@ class DataprocClusterCreateOperator(BaseOperator):
service = hook.get_conn()
if self._get_cluster(service):
- logging.info('Cluster {} already exists... Checking status...'.format(
- self.cluster_name
- ))
+ self.logger.info(
+ 'Cluster %s already exists... Checking status...',
+ self.cluster_name
+ )
self._wait_for_done(service)
return True
@@ -289,9 +290,10 @@ class DataprocClusterCreateOperator(BaseOperator):
# probably two cluster start commands at the same time
time.sleep(10)
if self._get_cluster(service):
- logging.info('Cluster {} already exists... Checking status...'.format(
- self.cluster_name
- ))
+ self.logger.info(
+ 'Cluster {} already exists... Checking status...',
+ self.cluster_name
+ )
self._wait_for_done(service)
return True
else:
@@ -356,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
time.sleep(15)
def execute(self, context):
- logging.info('Deleting cluster: {}'.format(self.cluster_name))
+ self.logger.info('Deleting cluster: %s', self.cluster_name)
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
@@ -369,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
clusterName=self.cluster_name
).execute()
operation_name = response['name']
- logging.info("Cluster delete operation name: {}".format(operation_name))
+ self.logger.info("Cluster delete operation name: %s", operation_name)
self._wait_for_done(service, operation_name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 1980dfe..76415e1 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
from airflow.contrib.hooks.datastore_hook import DatastoreHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
+
class DatastoreExportOperator(BaseOperator):
"""
Export entities from Google Cloud Datastore to Cloud Storage
@@ -79,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- logging.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+ self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
if self.overwrite_existing and self.namespace:
gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 3427ba5..74bd940 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -12,13 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
-
from airflow.contrib.hooks.datastore_hook import DatastoreHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
+
class DatastoreImportOperator(BaseOperator):
"""
Import entities from Cloud Storage to Google Cloud Datastore
@@ -74,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
self.xcom_push = xcom_push
def execute(self, context):
- logging.info('Importing data from Cloud Storage bucket ' + self.bucket)
+ self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
file=self.file,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 11f8c94..0c75eaa 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
-import logging
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -57,12 +56,11 @@ class ECSOperator(BaseOperator):
self.hook = self.get_hook()
def execute(self, context):
-
- logging.info('Running ECS Task - Task definition: {} - on cluster {}'.format(
- self.task_definition,
- self.cluster
- ))
- logging.info('ECSOperator overrides: {}'.format(self.overrides))
+ self.logger.info(
+ 'Running ECS Task - Task definition: %s - on cluster %s',
+ self.task_definition,self.cluster
+ )
+ self.logger.info('ECSOperator overrides: %s', self.overrides)
self.client = self.hook.get_client_type(
'ecs',
@@ -77,15 +75,15 @@ class ECSOperator(BaseOperator):
)
failures = response['failures']
- if (len(failures) > 0):
+ if len(failures) > 0:
raise AirflowException(response)
- logging.info('ECS Task started: {}'.format(response))
+ self.logger.info('ECS Task started: %s', response)
self.arn = response['tasks'][0]['taskArn']
self._wait_for_task_ended()
self._check_success_task()
- logging.info('ECS Task has been successfully executed: {}'.format(response))
+ self.logger.info('ECS Task has been successfully executed: %s', response)
def _wait_for_task_ended(self):
waiter = self.client.get_waiter('tasks_stopped')
@@ -100,9 +98,9 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
tasks=[self.arn]
)
- logging.info('ECS Task stopped, check status: {}'.format(response))
+ self.logger.info('ECS Task stopped, check status: %s', response)
- if (len(response.get('failures', [])) > 0):
+ if len(response.get('failures', [])) > 0:
raise AirflowException(response)
for task in response['tasks']:
@@ -126,4 +124,4 @@ class ECSOperator(BaseOperator):
cluster=self.cluster,
task=self.arn,
reason='Task killed by the user')
- logging.info(response)
+ self.logger.info(response)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index 84ef2d0..dbf764e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.exceptions import AirflowException
@@ -51,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- logging.info('Adding steps to %s', self.job_flow_id)
+ self.logger.info('Adding steps to %s', self.job_flow_id)
response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('Adding steps failed: %s' % response)
else:
- logging.info('Steps %s added to JobFlow', response['StepIds'])
+ self.logger.info('Steps %s added to JobFlow', response['StepIds'])
return response['StepIds']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 37d885d..4e40b17 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
@@ -53,11 +50,14 @@ class EmrCreateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
- logging.info('Creating JobFlow')
+ self.logger.info(
+ 'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
+ self.aws_conn_id, self.emr_conn_id
+ )
response = emr.create_job_flow(self.job_flow_overrides)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow creation failed: %s' % response)
else:
- logging.info('JobFlow with id %s created', response['JobFlowId'])
+ self.logger.info('JobFlow with id %s created', response['JobFlowId'])
return response['JobFlowId']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index 1b57276..df641ad 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.exceptions import AirflowException
@@ -46,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
def execute(self, context):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- logging.info('Terminating JobFlow %s', self.job_flow_id)
+ self.logger.info('Terminating JobFlow %s', self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow termination failed: %s' % response)
else:
- logging.info('JobFlow with id %s terminated', self.job_flow_id)
+ self.logger.info('JobFlow with id %s terminated', self.job_flow_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 32e6b29..4519e1e 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
-
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults
class FileToWasbOperator(BaseOperator):
"""
Uploads a file to Azure Blob Storage.
-
+
:param file_path: Path to the file to load.
:type file_path: str
:param container_name: Name of the container.
@@ -54,8 +51,7 @@ class FileToWasbOperator(BaseOperator):
def execute(self, context):
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
- logging.info(
- 'Uploading {self.file_path} to wasb://{self.container_name} as '
- '{self.blob_name}'.format(**locals()))
- hook.load_file(self.file_path, self.container_name, self.blob_name,
- **self.load_options)
+ self.logger.info(
+ 'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
+ )
+ hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index 2596487..ca7d716 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -14,7 +14,6 @@
#
from os import walk
-import logging
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.hooks.fs_hook import FSHook
@@ -49,8 +48,7 @@ class FileSensor(BaseSensorOperator):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = "/".join([basepath, self.filepath])
- logging.info(
- 'Poking for file {full_path} '.format(**locals()))
+ self.logger.info('Poking for file {full_path}'.format(**locals()))
try:
files = [f for f in walk(full_path)]
except:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index c17f774..27e85b7 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import sys
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@@ -48,7 +47,6 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',)
ui_color = '#f0eee4'
- @apply_defaults
def __init__(self,
bucket,
object,
@@ -67,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
self.delegate_to = delegate_to
def execute(self, context):
- logging.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+ self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -76,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
- logging.info(file_bytes)
+ self.logger.info(file_bytes)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 39f0a48..01f53cc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -13,7 +13,6 @@
# limitations under the License.
import json
-import logging
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
@@ -190,7 +189,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
self.destination_project_dataset_table))
row = cursor.fetchone()
max_id = row[0] if row[0] else 0
- logging.info('Loaded BQ data with max {}.{}={}'.format(
- self.destination_project_dataset_table,
- self.max_id_key, max_id))
+ self.logger.info(
+ 'Loaded BQ data with max %s.%s=%s',
+ self.destination_project_dataset_table, self.max_id_key, max_id
+ )
return max_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index aeb37d9..19c6d76 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -17,7 +17,6 @@ from builtins import str
from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
-import logging
import requests
import json
@@ -67,7 +66,7 @@ class HipChatAPIOperator(BaseOperator):
'Authorization': 'Bearer %s' % self.token},
data=self.body)
if response.status_code >= 400:
- logging.error('HipChat API call failed: %s %s',
+ self.logger.error('HipChat API call failed: %s %s',
response.status_code, response.reason)
raise AirflowException('HipChat API call failed: %s %s' %
(response.status_code, response.reason))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index 7476825..fdbfede 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -13,8 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
import re
from airflow import settings
@@ -24,8 +22,9 @@ from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from apiclient import errors
+from airflow.utils.log.LoggingMixin import LoggingMixin
-logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL)
+log = LoggingMixin().logger
def _create_prediction_input(project_id,
@@ -52,7 +51,6 @@ def _create_prediction_input(project_id,
Raises:
ValueError: if a unique model/version origin cannot be determined.
"""
-
prediction_input = {
'dataFormat': data_format,
'inputPaths': input_paths,
@@ -62,9 +60,9 @@ def _create_prediction_input(project_id,
if uri:
if model_name or version_name:
- logging.error(
- 'Ambiguous model origin: Both uri and model/version name are '
- 'provided.')
+ log.error(
+ 'Ambiguous model origin: Both uri and model/version name are provided.'
+ )
raise ValueError('Ambiguous model origin.')
prediction_input['uri'] = uri
elif model_name:
@@ -75,7 +73,7 @@ def _create_prediction_input(project_id,
prediction_input['versionName'] = \
origin_name + '/versions/{}'.format(version_name)
else:
- logging.error(
+ log.error(
'Missing model origin: Batch prediction expects a model, '
'a model & version combination, or a URI to savedModel.')
raise ValueError('Missing model origin.')
@@ -227,9 +225,10 @@ class MLEngineBatchPredictionOperator(BaseOperator):
model_name, version_name, uri, max_worker_count,
runtime_version)
except ValueError as e:
- logging.error(
- 'Cannot create batch prediction job request due to: {}'
- .format(str(e)))
+ self.logger.error(
+ 'Cannot create batch prediction job request due to: %s',
+ e
+ )
raise
self.prediction_job_request = {
@@ -252,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
raise
if finished_prediction_job['state'] != 'SUCCEEDED':
- logging.error(
+ self.logger.error(
'Batch prediction job failed: %s',
str(finished_prediction_job))
raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -539,9 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
}
if self._mode == 'DRY_RUN':
- logging.info('In dry_run mode.')
- logging.info(
- 'MLEngine Training job request is: {}'.format(training_request))
+ self.logger.info('In dry_run mode.')
+ self.logger.info('MLEngine Training job request is: {}'.format(training_request))
return
hook = MLEngineHook(
@@ -559,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
raise
if finished_training_job['state'] != 'SUCCEEDED':
- logging.error('MLEngine training job failed: {}'.format(
+ self.logger.error('MLEngine training job failed: {}'.format(
str(finished_training_job)))
raise RuntimeError(finished_training_job['errorMessage'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_prediction_summary.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py
index 1f4d540..17fc2c0 100644
--- a/airflow/contrib/operators/mlengine_prediction_summary.py
+++ b/airflow/contrib/operators/mlengine_prediction_summary.py
@@ -95,7 +95,6 @@ from __future__ import print_function
import argparse
import base64
import json
-import logging
import os
import apache_beam as beam
@@ -173,5 +172,4 @@ def run(argv=None):
if __name__ == "__main__":
- logging.getLogger().setLevel(logging.INFO)
run()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index 374567b..7e83bce 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -13,14 +13,12 @@
# limitations under the License.
import json
-import logging
import time
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
-from collections import OrderedDict
from datetime import date, datetime
from decimal import Decimal
from MySQLdb.constants import FIELD_TYPE
@@ -170,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
'mode': field_mode,
})
- logging.info('Using schema for %s: %s', self.schema_filename, schema)
+ self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
json.dump(schema, tmp_schema_file_handle)
return {self.schema_filename: tmp_schema_file_handle}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index b9f07d5..5abfc51 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -84,12 +81,12 @@ class SFTPOperator(BaseOperator):
if self.operation.lower() == SFTPOperation.GET:
file_msg = "from {0} to {1}".format(self.remote_filepath,
self.local_filepath)
- logging.debug("Starting to transfer {0}".format(file_msg))
+ self.logger.debug("Starting to transfer %s", file_msg)
sftp_client.get(self.remote_filepath, self.local_filepath)
else:
file_msg = "from {0} to {1}".format(self.local_filepath,
self.remote_filepath)
- logging.debug("Starting to transfer file {0}".format(file_msg))
+ self.logger.debug("Starting to transfer file %s", file_msg)
sftp_client.put(self.local_filepath, self.remote_filepath)
except Exception as e:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index ca628e9..2aed4c6 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -12,14 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import logging
-
from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
from airflow.settings import WEB_COLORS
-
-log = logging.getLogger(__name__)
+from airflow.utils.decorators import apply_defaults
class SparkSubmitOperator(BaseOperator):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 2e03b96..897cd1a 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -13,7 +13,6 @@
# limitations under the License.
from base64 import b64encode
-import logging
from airflow import configuration
from airflow.contrib.hooks.ssh_hook import SSHHook
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index 9266563..fc9cf3b 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
@@ -42,6 +39,6 @@ class VerticaOperator(BaseOperator):
self.sql = sql
def execute(self, context):
- logging.info('Executing: ' + str(self.sql))
+ self.logger.info('Executing: %s', self.sql)
hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
hook.run(self.sql)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 31ce110..35ff3c6 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -15,7 +15,6 @@
from builtins import chr
from collections import OrderedDict
import unicodecsv as csv
-import logging
from tempfile import NamedTemporaryFile
from airflow.hooks.hive_hooks import HiveCliHook
@@ -104,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
- logging.info("Dumping Vertica query results to local file")
+ self.logger.info("Dumping Vertica query results to local file")
conn = vertica.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -120,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- logging.info("Loading file into Hive")
+ self.logger.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 8a8ca62..630cebe 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.decorators import apply_defaults
@@ -62,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator):
def poke(self, context):
table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
- logging.info('Sensor checks existence of table: %s', table_uri)
+ self.logger.info('Sensor checks existence of table: %s', table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index d8660f7..4ee45f9 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.hooks.datadog_hook import DatadogHook
from airflow.utils import apply_defaults
@@ -70,7 +67,7 @@ class DatadogSensor(BaseSensorOperator):
tags=self.tags)
if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
- logging.error("Unexpected datadog result: %s" % (response))
+ self.logger.error("Unexpected Datadog result: %s", response)
raise AirflowException("Datadog returned unexpected result")
if self.response_check:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 5526604..034fcb6 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils import apply_defaults
from airflow.exceptions import AirflowException
@@ -23,7 +20,7 @@ class EmrBaseSensor(BaseSensorOperator):
"""
Contains general sensor behavior for EMR.
Subclasses should implement get_emr_response() and state_from_response() methods.
- Subclasses should also implment NON_TERMINAL_STATES and FAILED_STATE constants.
+ Subclasses should also implement NON_TERMINAL_STATES and FAILED_STATE constants.
"""
ui_color = '#66c3ff'
@@ -39,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator):
response = self.get_emr_response()
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
- logging.info('Bad HTTP response: %s' % response)
+ self.logger.info('Bad HTTP response: %s', response)
return False
state = self.state_from_response(response)
- logging.info('Job flow currently %s' % state)
+ self.logger.info('Job flow currently %s', state)
if state in self.NON_TERMINAL_STATES:
return False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index 662b3b8..e5610a1 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -11,10 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
-import logging
-
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.utils import apply_defaults
@@ -45,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
def get_emr_response(self):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- logging.info('Poking cluster %s' % self.job_flow_id)
+ self.logger.info('Poking cluster %s', self.job_flow_id)
return emr.describe_cluster(ClusterId=self.job_flow_id)
def state_from_response(self, response):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index 4cc6bc4..e131d77 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.utils import apply_defaults
@@ -48,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor):
def get_emr_response(self):
emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
- logging.info('Poking step {0} on cluster {1}'.format(self.step_id, self.job_flow_id))
+ self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
def state_from_response(self, response):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index 4a9428b..2e604e9 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -11,9 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import ftplib
-import logging
from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
from airflow.operators.sensors import BaseSensorOperator
@@ -44,7 +42,7 @@ class FTPSensor(BaseSensorOperator):
def poke(self, context):
with self._create_hook() as hook:
- logging.info('Poking for %s', self.path)
+ self.logger.info('Poking for %s', self.path)
try:
hook.get_mod_time(self.path)
except ftplib.error_perm as e:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index c9d741b..800c1bd 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -57,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
self.delegate_to = delegate_to
def poke(self, context):
- logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
@@ -119,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
self.delegate_to = delegate_to
def poke(self, context):
- logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 4e9bb9b..11e8b07 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators.sensors import HdfsSensor
-import logging
class HdfsSensorRegex(HdfsSensor):
@@ -29,9 +28,9 @@ class HdfsSensorRegex(HdfsSensor):
:return: Bool depending on the search criteria
"""
sb = self.hook(self.hdfs_conn_id).get_conn()
- logging.getLogger("snakebite").setLevel(logging.WARNING)
- logging.info(
- 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals()))
+ self.logger.info(
+ 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
+ )
result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
@@ -53,15 +52,14 @@ class HdfsSensorFolder(HdfsSensor):
:return: Bool depending on the search criteria
"""
sb = self.hook(self.hdfs_conn_id).get_conn()
- logging.getLogger("snakebite").setLevel(logging.WARNING)
result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
result = self.filter_for_filesize(result, self.file_size)
if self.be_empty:
- logging.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
+ self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
return len(result) == 1 and result[0]['path'] == self.filepath
else:
- logging.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
+ self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
result.pop(0)
return bool(result) and result[0]['file_type'] == 'f'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 708caad..4cbc676 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from jira.resources import Resource
from airflow.contrib.operators.jira_operator import JIRAError
@@ -100,8 +97,7 @@ class JiraTicketSensor(JiraSensor):
*args, **kwargs)
def poke(self, context):
- logging.info('Jira Sensor checking for change in ticket : {0}'
- .format(self.ticket_id))
+ self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
self.jira_operator.method_name = "issue"
self.jira_operator.jira_method_args = {
@@ -127,20 +123,19 @@ class JiraTicketSensor(JiraSensor):
and getattr(field_value, 'name'):
result = self.expected_value.lower() == field_value.name.lower()
else:
- logging.warning("not implemented checker for issue field {0} "
- "which is neither string nor list nor "
- "jira Resource".format(self.field))
+ self.logger.warning(
+ "Not implemented checker for issue field %s which "
+ "is neither string nor list nor Jira Resource",
+ self.field
+ )
except JIRAError as jira_error:
- logging.error("jira error while checking with expected value: {0}"
- .format(jira_error))
+ self.logger.error("Jira error while checking with expected value: %s", jira_error)
except Exception as e:
- logging.error("error while checking with expected value {0}, error: {1}"
- .format(self.expected_value, e))
+ self.logger.error("Error while checking with expected value %s:", self.expected_value)
+ self.logger.exception(e)
if result is True:
- logging.info("issue field {0} has expected value {1}, returning success"
- .format(self.field, self.expected_value))
+ self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
else:
- logging.info("issue field {0} dont have expected value {1} yet."
- .format(self.field, self.expected_value))
+ self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
return result
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 4cab407..220d766 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
-
from airflow.contrib.hooks.redis_hook import RedisHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -37,7 +34,6 @@ class RedisKeySensor(BaseSensorOperator):
:type redis_conn_id: string
"""
super(RedisKeySensor, self).__init__(*args, **kwargs)
- self.logger = logging.getLogger(__name__)
self.redis_conn_id = redis_conn_id
self.key = key
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 3f3d56c..1a54e12 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import logging
-
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults
class WasbBlobSensor(BaseSensorOperator):
"""
Waits for a blob to arrive on Azure Blob Storage.
-
+
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Name of the blob.
@@ -50,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator):
self.check_options = check_options
def poke(self, context):
- logging.info(
+ self.logger.info(
'Poking for blob: {self.blob_name}\n'
'in wasb://{self.container_name}'.format(**locals())
)
@@ -62,7 +59,7 @@ class WasbBlobSensor(BaseSensorOperator):
class WasbPrefixSensor(BaseSensorOperator):
"""
Waits for blobs matching a prefix to arrive on Azure Blob Storage.
-
+
:param container_name: Name of the container.
:type container_name: str
:param prefix: Prefix of the blob.
@@ -88,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator):
self.check_options = check_options
def poke(self, context):
- logging.info(
+ self.logger.info(
'Poking for prefix: {self.prefix}\n'
'in wasb://{self.container_name}'.format(**locals())
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py
index 11c45c1..5d2518d 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -14,7 +14,6 @@
import datetime
import getpass
-import subprocess
import os
import uuid
@@ -73,13 +72,13 @@ class CgroupTaskRunner(BaseTaskRunner):
for path_element in path_split:
name_to_node = {x.name: x for x in node.children}
if path_element not in name_to_node:
- self.logger.debug("Creating cgroup {} in {}"
- .format(path_element, node.path))
+ self.logger.debug("Creating cgroup %s in %s", path_element, node.path)
node = node.create_cgroup(path_element)
else:
- self.logger.debug("Not creating cgroup {} in {} "
- "since it already exists"
- .format(path_element, node.path))
+ self.logger.debug(
+ "Not creating cgroup %s in %s since it already exists",
+ path_element, node.path
+ )
node = name_to_node[path_element]
return node
@@ -95,24 +94,23 @@ class CgroupTaskRunner(BaseTaskRunner):
for path_element in path_split:
name_to_node = {x.name: x for x in node.children}
if path_element not in name_to_node:
- self.logger.warning("Cgroup does not exist: {}"
- .format(path))
+ self.logger.warning("Cgroup does not exist: %s", path)
return
else:
node = name_to_node[path_element]
# node is now the leaf node
parent = node.parent
- self.logger.debug("Deleting cgroup {}/{}".format(parent, node.name))
+ self.logger.debug("Deleting cgroup %s/%s", parent, node.name)
parent.delete_cgroup(node.name)
def start(self):
# Use bash if it's already in a cgroup
cgroups = self._get_cgroup_names()
if cgroups["cpu"] != "/" or cgroups["memory"] != "/":
- self.logger.debug("Already running in a cgroup (cpu: {} memory: {} so "
- "not creating another one"
- .format(cgroups.get("cpu"),
- cgroups.get("memory")))
+ self.logger.debug(
+ "Already running in a cgroup (cpu: %s memory: %s) so not creating another one",
+ cgroups.get("cpu"), cgroups.get("memory")
+ )
self.process = self.run_command(['bash', '-c'], join_args=True)
return
@@ -135,21 +133,27 @@ class CgroupTaskRunner(BaseTaskRunner):
mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
self._created_mem_cgroup = True
if self._mem_mb_limit > 0:
- self.logger.debug("Setting {} with {} MB of memory"
- .format(self.mem_cgroup_name, self._mem_mb_limit))
+ self.logger.debug(
+ "Setting %s with %s MB of memory",
+ self.mem_cgroup_name, self._mem_mb_limit
+ )
mem_cgroup_node.controller.limit_in_bytes = self._mem_mb_limit * 1024 * 1024
# Create the CPU cgroup
cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
self._created_cpu_cgroup = True
if self._cpu_shares > 0:
- self.logger.debug("Setting {} with {} CPU shares"
- .format(self.cpu_cgroup_name, self._cpu_shares))
+ self.logger.debug(
+ "Setting %s with %s CPU shares",
+ self.cpu_cgroup_name, self._cpu_shares
+ )
cpu_cgroup_node.controller.shares = self._cpu_shares
# Start the process w/ cgroups
- self.logger.debug("Starting task process with cgroups cpu,memory:{}"
- .format(cgroup_name))
+ self.logger.debug(
+ "Starting task process with cgroups cpu,memory: %s",
+ cgroup_name
+ )
self.process = self.run_command(
['cgexec', '-g', 'cpu,memory:{}'.format(cgroup_name)]
)
@@ -165,10 +169,9 @@ class CgroupTaskRunner(BaseTaskRunner):
# we might want to revisit that approach at some other point.
if return_code == 137:
self.logger.warning("Task failed with return code of 137. This may indicate "
- "that it was killed due to excessive memory usage. "
- "Please consider optimizing your task or using the "
- "resources argument to reserve more memory for your "
- "task")
+ "that it was killed due to excessive memory usage. "
+ "Please consider optimizing your task or using the "
+ "resources argument to reserve more memory for your task")
return return_code
def terminate(self):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index a8cb087..7812f96 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import logging
import sys
from airflow import configuration
@@ -21,6 +19,7 @@ from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
DEFAULT_EXECUTOR = None
@@ -42,14 +41,15 @@ def GetDefaultExecutor():
DEFAULT_EXECUTOR = _get_executor(executor_name)
- logging.info("Using executor " + executor_name)
+ log = LoggingMixin().logger
+ log.info("Using executor %s", executor_name)
return DEFAULT_EXECUTOR
def _get_executor(executor_name):
"""
- Creates a new instance of the named executor. In case the executor name is not know in airflow,
+ Creates a new instance of the named executor. In case the executor name is not know in airflow,
look for it in the plugins
"""
if executor_name == 'LocalExecutor':
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 7a4065e..1197958 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -11,12 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from builtins import range
from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
from airflow.utils.state import State
-from airflow.utils.logging import LoggingMixin
PARALLELISM = configuration.getint('core', 'PARALLELISM')
@@ -47,7 +46,7 @@ class BaseExecutor(LoggingMixin):
def queue_command(self, task_instance, command, priority=1, queue=None):
key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
- self.logger.info("Adding to queue: {}".format(command))
+ self.logger.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, task_instance)
def queue_task_instance(
@@ -100,9 +99,9 @@ class BaseExecutor(LoggingMixin):
else:
open_slots = self.parallelism - len(self.running)
- self.logger.debug("{} running task instances".format(len(self.running)))
- self.logger.debug("{} in queue".format(len(self.queued_tasks)))
- self.logger.debug("{} open slots".format(open_slots))
+ self.logger.debug("%s running task instances", len(self.running))
+ self.logger.debug("%s in queue", len(self.queued_tasks))
+ self.logger.debug("%s open slots", open_slots)
sorted_queue = sorted(
[(k, v) for k, v in self.queued_tasks.items()],
@@ -124,11 +123,12 @@ class BaseExecutor(LoggingMixin):
self.execute_async(key, command=command, queue=queue)
else:
self.logger.debug(
- 'Task is already running, not sending to '
- 'executor: {}'.format(key))
+ 'Task is already running, not sending to executor: %s',
+ key
+ )
# Calling child class sync method
- self.logger.debug("Calling the {} sync method".format(self.__class__))
+ self.logger.debug("Calling the %s sync method", self.__class__)
self.sync()
def change_state(self, key, state):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 17c343b..39c895c 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -13,7 +13,6 @@
# limitations under the License.
from builtins import object
-import logging
import subprocess
import ssl
import time
@@ -25,6 +24,7 @@ from celery import states as celery_states
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -53,7 +53,8 @@ class CeleryConfig(object):
try:
celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
except AirflowConfigException as e:
- logging.warning("Celery Executor will run without SSL")
+ log = LoggingMixin().logger
+ log.warning("Celery Executor will run without SSL")
try:
if celery_ssl_active:
@@ -75,11 +76,12 @@ app = Celery(
@app.task
def execute_command(command):
- logging.info("Executing command in Celery " + command)
+ log = LoggingMixin().logger
+ log.info("Executing command in Celery: %s", command)
try:
subprocess.check_call(command, shell=True)
except subprocess.CalledProcessError as e:
- logging.error(e)
+ log.error(e)
raise AirflowException('Celery command failed')
@@ -92,22 +94,18 @@ class CeleryExecutor(BaseExecutor):
vast amounts of messages, while providing operations with the tools
required to maintain such a system.
"""
-
def start(self):
self.tasks = {}
self.last_state = {}
def execute_async(self, key, command, queue=DEFAULT_QUEUE):
- self.logger.info( "[celery] queuing {key} through celery, "
- "queue={queue}".format(**locals()))
+ self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
def sync(self):
-
- self.logger.debug(
- "Inquiring about {} celery task(s)".format(len(self.tasks)))
+ self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks))
for key, async in list(self.tasks.items()):
try:
state = async.state
@@ -125,11 +123,11 @@ class CeleryExecutor(BaseExecutor):
del self.tasks[key]
del self.last_state[key]
else:
- self.logger.info("Unexpected state: " + async.state)
+ self.logger.info("Unexpected state: %s", async.state)
self.last_state[key] = async.state
except Exception as e:
- logging.error("Error syncing the celery executor, ignoring "
- "it:\n{}\n".format(e, traceback.format_exc()))
+ self.logger.error("Error syncing the celery executor, ignoring it:")
+ self.logger.exception(e)
def end(self, synchronous=False):
if synchronous:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index d65830a..8a56506 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -13,7 +13,6 @@
# limitations under the License.
import distributed
-
import subprocess
import warnings
@@ -41,8 +40,8 @@ class DaskExecutor(BaseExecutor):
def execute_async(self, key, command, queue=None):
if queue is not None:
warnings.warn(
- 'DaskExecutor does not support queues. All tasks will be run '
- 'in the same cluster')
+ 'DaskExecutor does not support queues. All tasks will be run in the same cluster'
+ )
def airflow_run():
return subprocess.check_call(command, shell=True)
@@ -54,12 +53,11 @@ class DaskExecutor(BaseExecutor):
if future.done():
key = self.futures[future]
if future.exception():
+ self.logger.error("Failed to execute task: %s", repr(future.exception()))
self.fail(key)
- self.logger.error("Failed to execute task: {}".format(
- repr(future.exception())))
elif future.cancelled():
- self.fail(key)
self.logger.error("Failed to execute task")
+ self.fail(key)
else:
self.success(key)
self.futures.pop(future)