You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/19 08:17:31 UTC
[2/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to
log
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f690fb4..28dcc04 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -77,7 +77,7 @@ from airflow.utils.operator_resources import Resources
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
Base = declarative_base()
ID_LEN = 250
@@ -184,7 +184,7 @@ class DagBag(BaseDagBag, LoggingMixin):
if executor is None:
executor = GetDefaultExecutor()
dag_folder = dag_folder or settings.DAGS_FOLDER
- self.logger.info("Filling up the DagBag from %s", dag_folder)
+ self.log.info("Filling up the DagBag from %s", dag_folder)
self.dag_folder = dag_folder
self.dags = {}
# the file's last modified timestamp when we last read it
@@ -257,7 +257,7 @@ class DagBag(BaseDagBag, LoggingMixin):
return found_dags
except Exception as e:
- self.logger.exception(e)
+ self.log.exception(e)
return found_dags
mods = []
@@ -269,7 +269,7 @@ class DagBag(BaseDagBag, LoggingMixin):
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
- self.logger.debug("Importing %s", filepath)
+ self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ('unusual_prefix_' +
hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
@@ -283,7 +283,7 @@ class DagBag(BaseDagBag, LoggingMixin):
m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
- self.logger.exception("Failed to import: %s", filepath)
+ self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
@@ -294,10 +294,10 @@ class DagBag(BaseDagBag, LoggingMixin):
mod_name, ext = os.path.splitext(mod.filename)
if not head and (ext == '.py' or ext == '.pyc'):
if mod_name == '__init__':
- self.logger.warning("Found __init__.%s at root of %s", ext, filepath)
+ self.log.warning("Found __init__.%s at root of %s", ext, filepath)
if safe_mode:
with zip_file.open(mod.filename) as zf:
- self.logger.debug("Reading %s from %s", mod.filename, filepath)
+ self.log.debug("Reading %s from %s", mod.filename, filepath)
content = zf.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = (
@@ -313,7 +313,7 @@ class DagBag(BaseDagBag, LoggingMixin):
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
- self.logger.exception("Failed to import: %s", filepath)
+ self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
@@ -336,11 +336,11 @@ class DagBag(BaseDagBag, LoggingMixin):
Fails tasks that haven't had a heartbeat in too long
"""
from airflow.jobs import LocalTaskJob as LJ
- self.logger.info("Finding 'running' jobs without a recent heartbeat")
+ self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = TaskInstance
secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
limit_dttm = datetime.now() - timedelta(seconds=secs)
- self.logger.info("Failing jobs without heartbeat after %s", limit_dttm)
+ self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
tis = (
session.query(TI)
@@ -361,7 +361,7 @@ class DagBag(BaseDagBag, LoggingMixin):
task = dag.get_task(ti.task_id)
ti.task = task
ti.handle_failure("{} killed as zombie".format(str(ti)))
- self.logger.info('Marked zombie job %s as failed', ti)
+ self.log.info('Marked zombie job %s as failed', ti)
Stats.incr('zombies_killed')
session.commit()
@@ -381,7 +381,7 @@ class DagBag(BaseDagBag, LoggingMixin):
subdag.parent_dag = dag
subdag.is_subdag = True
self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
- self.logger.debug('Loaded DAG {dag}'.format(**locals()))
+ self.log.debug('Loaded DAG {dag}'.format(**locals()))
def collect_dags(
self,
@@ -439,7 +439,7 @@ class DagBag(BaseDagBag, LoggingMixin):
str([dag.dag_id for dag in found_dags]),
))
except Exception as e:
- self.logger.warning(e)
+ self.log.warning(e)
Stats.gauge(
'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1)
Stats.gauge(
@@ -606,7 +606,7 @@ class Connection(Base, LoggingMixin):
self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_encrypted = True
except AirflowException:
- self.logger.exception("Failed to load fernet while encrypting value, "
+ self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
self._password = value
self.is_encrypted = False
@@ -635,7 +635,7 @@ class Connection(Base, LoggingMixin):
self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_extra_encrypted = True
except AirflowException:
- self.logger.exception("Failed to load fernet while encrypting value, "
+ self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
self._extra = value
self.is_extra_encrypted = False
@@ -706,8 +706,8 @@ class Connection(Base, LoggingMixin):
try:
obj = json.loads(self.extra)
except Exception as e:
- self.logger.exception(e)
- self.logger.error("Failed parsing the json for conn_id %s", self.conn_id)
+ self.log.exception(e)
+ self.log.error("Failed parsing the json for conn_id %s", self.conn_id)
return obj
@@ -1001,7 +1001,7 @@ class TaskInstance(Base, LoggingMixin):
"""
Forces the task instance's state to FAILED in the database.
"""
- self.logger.error("Recording the task instance as FAILED")
+ self.log.error("Recording the task instance as FAILED")
self.state = State.FAILED
session.merge(self)
session.commit()
@@ -1152,7 +1152,7 @@ class TaskInstance(Base, LoggingMixin):
session=session):
failed = True
if verbose:
- self.logger.info(
+ self.log.info(
"Dependencies not met for %s, dependency '%s' FAILED: %s",
self, dep_status.dep_name, dep_status.reason
)
@@ -1161,7 +1161,7 @@ class TaskInstance(Base, LoggingMixin):
return False
if verbose:
- self.logger.info("Dependencies all met for %s", self)
+ self.log.info("Dependencies all met for %s", self)
return True
@@ -1177,7 +1177,7 @@ class TaskInstance(Base, LoggingMixin):
session,
dep_context):
- self.logger.debug(
+ self.log.debug(
"%s dependency '%s' PASSED: %s, %s",
self, dep_status.dep_name, dep_status.passed, dep_status.reason
)
@@ -1354,10 +1354,10 @@ class TaskInstance(Base, LoggingMixin):
"runtime. Attempt {attempt} of {total}. State set to NONE.").format(
attempt=self.try_number + 1,
total=self.max_tries + 1)
- self.logger.warning(hr + msg + hr)
+ self.log.warning(hr + msg + hr)
self.queued_dttm = datetime.now()
- self.logger.info("Queuing into pool %s", self.pool)
+ self.log.info("Queuing into pool %s", self.pool)
session.merge(self)
session.commit()
return False
@@ -1366,12 +1366,12 @@ class TaskInstance(Base, LoggingMixin):
# the current worker process was blocked on refresh_from_db
if self.state == State.RUNNING:
msg = "Task Instance already running {}".format(self)
- self.logger.warning(msg)
+ self.log.warning(msg)
session.commit()
return False
# print status message
- self.logger.info(hr + msg + hr)
+ self.log.info(hr + msg + hr)
self.try_number += 1
if not test_mode:
@@ -1389,10 +1389,10 @@ class TaskInstance(Base, LoggingMixin):
if verbose:
if mark_success:
msg = "Marking success for {} on {}".format(self.task, self.execution_date)
- self.logger.info(msg)
+ self.log.info(msg)
else:
msg = "Executing {} on {}".format(self.task, self.execution_date)
- self.logger.info(msg)
+ self.log.info(msg)
return True
@provide_session
@@ -1434,7 +1434,7 @@ class TaskInstance(Base, LoggingMixin):
def signal_handler(signum, frame):
"""Setting kill signal handler"""
- self.logger.error("Killing subprocess")
+ self.log.error("Killing subprocess")
task_copy.on_kill()
raise AirflowException("Task received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
@@ -1513,8 +1513,8 @@ class TaskInstance(Base, LoggingMixin):
if task.on_success_callback:
task.on_success_callback(context)
except Exception as e3:
- self.logger.error("Failed when executing success callback")
- self.logger.exception(e3)
+ self.log.error("Failed when executing success callback")
+ self.log.exception(e3)
session.commit()
@@ -1559,7 +1559,7 @@ class TaskInstance(Base, LoggingMixin):
task_copy.dry_run()
def handle_failure(self, error, test_mode=False, context=None):
- self.logger.exception(error)
+ self.log.exception(error)
task = self.task
session = settings.Session()
self.end_date = datetime.now()
@@ -1580,20 +1580,20 @@ class TaskInstance(Base, LoggingMixin):
# next task instance try_number exceeds the max_tries.
if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
- self.logger.info('Marking task as UP_FOR_RETRY')
+ self.log.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.retries:
- self.logger.info('All retries failed; marking task as FAILED')
+ self.log.info('All retries failed; marking task as FAILED')
else:
- self.logger.info('Marking task as FAILED.')
+ self.log.info('Marking task as FAILED.')
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
- self.logger.error('Failed to send email to: %s', task.email)
- self.logger.exception(e2)
+ self.log.error('Failed to send email to: %s', task.email)
+ self.log.exception(e2)
# Handling callbacks pessimistically
try:
@@ -1602,13 +1602,13 @@ class TaskInstance(Base, LoggingMixin):
if self.state == State.FAILED and task.on_failure_callback:
task.on_failure_callback(context)
except Exception as e3:
- self.logger.error("Failed at executing callback")
- self.logger.exception(e3)
+ self.log.error("Failed at executing callback")
+ self.log.exception(e3)
if not test_mode:
session.merge(self)
session.commit()
- self.logger.error(str(error))
+ self.log.error(str(error))
@provide_session
def get_template_context(self, session=None):
@@ -1898,7 +1898,7 @@ class Log(Base):
self.owner = owner or task_owner
-class SkipMixin(object):
+class SkipMixin(LoggingMixin):
def skip(self, dag_run, execution_date, tasks):
"""
Sets tasks instances to skipped from the same dag run.
@@ -1926,7 +1926,7 @@ class SkipMixin(object):
else:
assert execution_date is not None, "Execution date is None and no dag run"
- self.logger.warning("No DAG RUN present this should not happen")
+ self.log.warning("No DAG RUN present this should not happen")
# this is defensive against dag runs that are not complete
for task in tasks:
ti = TaskInstance(task, execution_date=execution_date)
@@ -2121,7 +2121,7 @@ class BaseOperator(LoggingMixin):
self.email_on_failure = email_on_failure
self.start_date = start_date
if start_date and not isinstance(start_date, datetime):
- self.logger.warning("start_date for %s isn't datetime.datetime", self)
+ self.log.warning("start_date for %s isn't datetime.datetime", self)
self.end_date = end_date
if not TriggerRule.is_valid(trigger_rule):
raise AirflowException(
@@ -2137,7 +2137,7 @@ class BaseOperator(LoggingMixin):
self.depends_on_past = True
if schedule_interval:
- self.logger.warning(
+ self.log.warning(
"schedule_interval is used for {}, though it has "
"been deprecated as a task parameter, you need to "
"specify it as a DAG parameter instead",
@@ -2155,7 +2155,7 @@ class BaseOperator(LoggingMixin):
if isinstance(retry_delay, timedelta):
self.retry_delay = retry_delay
else:
- self.logger.debug("Retry_delay isn't timedelta object, assuming secs")
+ self.log.debug("Retry_delay isn't timedelta object, assuming secs")
self.retry_delay = timedelta(seconds=retry_delay)
self.retry_exponential_backoff = retry_exponential_backoff
self.max_retry_delay = max_retry_delay
@@ -2455,7 +2455,7 @@ class BaseOperator(LoggingMixin):
try:
setattr(self, attr, env.loader.get_source(env, content)[0])
except Exception as e:
- self.logger.exception(e)
+ self.log.exception(e)
self.prepare_template()
@property
@@ -2574,12 +2574,12 @@ class BaseOperator(LoggingMixin):
ignore_ti_state=ignore_ti_state)
def dry_run(self):
- self.logger.info('Dry run')
+ self.log.info('Dry run')
for attr in self.template_fields:
content = getattr(self, attr)
if content and isinstance(content, six.string_types):
- self.logger.info('Rendering template for %s', attr)
- self.logger.info(content)
+ self.log.info('Rendering template for %s', attr)
+ self.log.info(content)
def get_direct_relatives(self, upstream=False):
"""
@@ -3517,7 +3517,7 @@ class DAG(BaseDag, LoggingMixin):
d['pickle_len'] = len(pickled)
d['pickling_duration'] = "{}".format(datetime.now() - dttm)
except Exception as e:
- self.logger.debug(e)
+ self.log.debug(e)
d['is_picklable'] = False
d['stacktrace'] = traceback.format_exc()
return d
@@ -3754,7 +3754,7 @@ class DAG(BaseDag, LoggingMixin):
DagModel).filter(DagModel.dag_id == self.dag_id).first()
if not orm_dag:
orm_dag = DagModel(dag_id=self.dag_id)
- self.logger.info("Creating ORM DAG for %s", self.dag_id)
+ self.log.info("Creating ORM DAG for %s", self.dag_id)
orm_dag.fileloc = self.fileloc
orm_dag.is_subdag = self.is_subdag
orm_dag.owners = owner
@@ -3797,11 +3797,11 @@ class DAG(BaseDag, LoggingMixin):
:type expiration_date: datetime
:return: None
"""
- logger = LoggingMixin().logger
+ log = LoggingMixin().log
for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
- logger.info(
+ log.info(
"Deactivating DAG ID %s since it was last touched by the scheduler at %s",
dag.dag_id, dag.last_scheduler_run.isoformat()
)
@@ -3930,7 +3930,7 @@ class Variable(Base, LoggingMixin):
self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_encrypted = True
except AirflowException:
- self.logger.exception(
+ self.log.exception(
"Failed to load fernet while encrypting value, using non-encrypted value."
)
self._val = value
@@ -4052,7 +4052,7 @@ class XCom(Base, LoggingMixin):
try:
value = json.dumps(value).encode('UTF-8')
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4123,7 +4123,7 @@ class XCom(Base, LoggingMixin):
try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4173,7 +4173,7 @@ class XCom(Base, LoggingMixin):
try:
result.value = json.loads(result.value.decode('UTF-8'))
except ValueError:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@@ -4229,7 +4229,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not update dag stats for %s", dag_id)
log.exception(e)
@@ -4282,7 +4282,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not update dag stat table")
log.exception(e)
@@ -4306,7 +4306,7 @@ class DagStat(Base):
session.commit()
except Exception as e:
session.rollback()
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.warning("Could not create stat record")
log.exception(e)
@@ -4524,7 +4524,7 @@ class DagRun(Base, LoggingMixin):
tis = self.get_task_instances(session=session)
- self.logger.info("Updating state for %s considering %s task(s)", self, len(tis))
+ self.log.info("Updating state for %s considering %s task(s)", self, len(tis))
for ti in list(tis):
# skip in db?
@@ -4570,18 +4570,18 @@ class DagRun(Base, LoggingMixin):
# if all roots finished and at least on failed, the run failed
if (not unfinished_tasks and
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
- self.logger.info('Marking run %s failed', self)
+ self.log.info('Marking run %s failed', self)
self.state = State.FAILED
# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
- self.logger.info('Marking run %s successful', self)
+ self.log.info('Marking run %s successful', self)
self.state = State.SUCCESS
# if *all tasks* are deadlocked, the run failed
elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
- self.logger.info('Deadlock; marking run %s failed', self)
+ self.log.info('Deadlock; marking run %s failed', self)
self.state = State.FAILED
# finally, if the roots aren't done, the dag is still running
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 63321fb..ff2ed51 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -67,7 +67,7 @@ class BashOperator(BaseOperator):
which will be cleaned afterwards
"""
bash_command = self.bash_command
- self.logger.info("Tmp dir root location: \n %s", gettempdir())
+ self.log.info("Tmp dir root location: \n %s", gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
@@ -75,11 +75,11 @@ class BashOperator(BaseOperator):
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
- self.logger.info(
+ self.log.info(
"Temporary script location: %s",
script_location
)
- self.logger.info("Running command: %s", bash_command)
+ self.log.info("Running command: %s", bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
@@ -88,13 +88,13 @@ class BashOperator(BaseOperator):
self.sp = sp
- self.logger.info("Output:")
+ self.log.info("Output:")
line = ''
for line in iter(sp.stdout.readline, b''):
line = line.decode(self.output_encoding).strip()
- self.logger.info(line)
+ self.log.info(line)
sp.wait()
- self.logger.info(
+ self.log.info(
"Command exited with return code %s",
sp.returncode
)
@@ -106,6 +106,6 @@ class BashOperator(BaseOperator):
return line
def on_kill(self):
- self.logger.info('Sending SIGTERM signal to bash process group')
+ self.log.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index f263a2c..ff82539 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -71,15 +71,15 @@ class CheckOperator(BaseOperator):
self.sql = sql
def execute(self, context=None):
- self.logger.info('Executing SQL check: %s', self.sql)
+ self.log.info('Executing SQL check: %s', self.sql)
records = self.get_db_hook().get_first(self.sql)
- self.logger.info('Record: %s', records)
+ self.log.info('Record: %s', records)
if not records:
raise AirflowException("The query returned None")
elif not all([bool(r) for r in records]):
exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}"
raise AirflowException(exceptstr.format(q=self.sql, r=records))
- self.logger.info("Success.")
+ self.log.info("Success.")
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
@@ -134,7 +134,7 @@ class ValueCheckOperator(BaseOperator):
self.has_tolerance = self.tol is not None
def execute(self, context=None):
- self.logger.info('Executing SQL check: %s', self.sql)
+ self.log.info('Executing SQL check: %s', self.sql)
records = self.get_db_hook().get_first(self.sql)
if not records:
raise AirflowException("The query returned None")
@@ -208,9 +208,9 @@ class IntervalCheckOperator(BaseOperator):
def execute(self, context=None):
hook = self.get_db_hook()
- self.logger.info('Executing SQL check: %s', self.sql2)
+ self.log.info('Executing SQL check: %s', self.sql2)
row2 = hook.get_first(self.sql2)
- self.logger.info('Executing SQL check: %s', self.sql1)
+ self.log.info('Executing SQL check: %s', self.sql1)
row1 = hook.get_first(self.sql1)
if not row2:
raise AirflowException("The query {q} returned None".format(q=self.sql2))
@@ -230,20 +230,20 @@ class IntervalCheckOperator(BaseOperator):
else:
ratio = float(max(current[m], reference[m])) / \
min(current[m], reference[m])
- self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
+ self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
ratios[m] = ratio
test_results[m] = ratio < self.metrics_thresholds[m]
if not all(test_results.values()):
failed_tests = [it[0] for it in test_results.items() if not it[1]]
j = len(failed_tests)
n = len(self.metrics_sorted)
- self.logger.warning(countstr.format(**locals()))
+ self.log.warning(countstr.format(**locals()))
for k in failed_tests:
- self.logger.warning(
+ self.log.warning(
fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k])
)
raise AirflowException(estr.format(", ".join(failed_tests)))
- self.logger.info("All tests have passed")
+ self.log.info("All tests have passed")
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index bd2862b..3a952cd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -69,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator):
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
- self.logger.info("Creating DagRun %s", dr)
+ self.log.info("Creating DagRun %s", dr)
session.add(dr)
session.commit()
session.close()
else:
- self.logger.info("Criteria not met, moving on")
+ self.log.info("Criteria not met, moving on")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index 8a333d6..3011f1c 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -134,7 +134,7 @@ class DockerOperator(BaseOperator):
self.container = None
def execute(self, context):
- self.logger.info('Starting docker container from image %s', self.image)
+ self.log.info('Starting docker container from image %s', self.image)
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
@@ -155,10 +155,10 @@ class DockerOperator(BaseOperator):
image = self.image
if self.force_pull or len(self.cli.images(name=image)) == 0:
- self.logger.info('Pulling docker image %s', image)
+ self.log.info('Pulling docker image %s', image)
for l in self.cli.pull(image, stream=True):
output = json.loads(l.decode('utf-8'))
- self.logger.info("%s", output['status'])
+ self.log.info("%s", output['status'])
cpu_shares = int(round(self.cpus * 1024))
@@ -184,7 +184,7 @@ class DockerOperator(BaseOperator):
line = line.strip()
if hasattr(line, 'decode'):
line = line.decode('utf-8')
- self.logger.info(line)
+ self.log.info(line)
exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
@@ -202,5 +202,5 @@ class DockerOperator(BaseOperator):
def on_kill(self):
if self.cli is not None:
- self.logger.info('Stopping docker container')
+ self.log.info('Stopping docker container')
self.cli.stop(self.container['Id'])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/generic_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py
index 790749a..c8a2a58 100644
--- a/airflow/operators/generic_transfer.py
+++ b/airflow/operators/generic_transfer.py
@@ -61,15 +61,15 @@ class GenericTransfer(BaseOperator):
def execute(self, context):
source_hook = BaseHook.get_hook(self.source_conn_id)
- self.logger.info("Extracting data from %s", self.source_conn_id)
- self.logger.info("Executing: \n %s", self.sql)
+ self.log.info("Extracting data from %s", self.source_conn_id)
+ self.log.info("Executing: \n %s", self.sql)
results = source_hook.get_records(self.sql)
destination_hook = BaseHook.get_hook(self.destination_conn_id)
if self.preoperator:
- self.logger.info("Running preoperator")
- self.logger.info(self.preoperator)
+ self.log.info("Running preoperator")
+ self.log.info(self.preoperator)
destination_hook.run(self.preoperator)
- self.logger.info("Inserting rows into %s", self.destination_conn_id)
+ self.log.info("Inserting rows into %s", self.destination_conn_id)
destination_hook.insert_rows(table=self.destination_table, rows=results)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 983069b..221feeb 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -93,7 +93,7 @@ class HiveOperator(BaseOperator):
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
def execute(self, context):
- self.logger.info('Executing: %s', self.hql)
+ self.log.info('Executing: %s', self.hql)
self.hook = self.get_hook()
self.hook.run_cli(hql=self.hql, schema=self.schema,
hive_conf=context_to_airflow_vars(context))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_stats_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py
index 025e427..896547e 100644
--- a/airflow/operators/hive_stats_operator.py
+++ b/airflow/operators/hive_stats_operator.py
@@ -139,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator):
""".format(**locals())
hook = PrestoHook(presto_conn_id=self.presto_conn_id)
- self.logger.info('Executing SQL check: %s', sql)
+ self.log.info('Executing SQL check: %s', sql)
row = hook.get_first(hql=sql)
- self.logger.info("Record: %s", row)
+ self.log.info("Record: %s", row)
if not row:
raise AirflowException("The query returned None")
part_json = json.dumps(self.partition, sort_keys=True)
- self.logger.info("Deleting rows from previous runs if they exist")
+ self.log.info("Deleting rows from previous runs if they exist")
mysql = MySqlHook(self.mysql_conn_id)
sql = """
SELECT 1 FROM hive_stats
@@ -167,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator):
""".format(**locals())
mysql.run(sql)
- self.logger.info("Pivoting and loading cells into the Airflow db")
+ self.log.info("Pivoting and loading cells into the Airflow db")
rows = [
(self.ds, self.dttm, self.table, part_json) +
(r[0][0], r[0][1], r[1])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index d7b1b82..e420dfd 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -92,7 +92,7 @@ class HiveToDruidTransfer(BaseOperator):
def execute(self, context):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
- self.logger.info("Extracting data from Hive")
+ self.log.info("Extracting data from Hive")
hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
sql = self.sql.strip().strip(';')
tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
@@ -107,7 +107,7 @@ class HiveToDruidTransfer(BaseOperator):
AS
{sql}
""".format(**locals())
- self.logger.info("Running command:\n %s", hql)
+ self.log.info("Running command:\n %s", hql)
hive.run_cli(hql)
m = HiveMetastoreHook(self.metastore_conn_id)
@@ -131,13 +131,13 @@ class HiveToDruidTransfer(BaseOperator):
columns=columns,
)
- self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path)
+ self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
druid.submit_indexing_job(index_spec)
- self.logger.info("Load seems to have succeeded!")
+ self.log.info("Load seems to have succeeded!")
finally:
- self.logger.info(
+ self.log.info(
"Cleaning up by dropping the temp Hive table %s",
hive_table
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index e82a099..d2d9d0c 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -77,7 +77,7 @@ class HiveToMySqlTransfer(BaseOperator):
def execute(self, context):
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
- self.logger.info("Extracting data from Hive: %s", self.sql)
+ self.log.info("Extracting data from Hive: %s", self.sql)
if self.bulk_load:
tmpfile = NamedTemporaryFile()
@@ -88,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator):
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
- self.logger.info("Running MySQL preoperator")
+ self.log.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
- self.logger.info("Inserting rows into MySQL")
+ self.log.info("Inserting rows into MySQL")
if self.bulk_load:
mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
@@ -100,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator):
mysql.insert_rows(table=self.mysql_table, rows=results)
if self.mysql_postoperator:
- self.logger.info("Running MySQL postoperator")
+ self.log.info("Running MySQL postoperator")
mysql.run(self.mysql_postoperator)
- self.logger.info("Done.")
+ self.log.info("Done.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_samba_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py
index d6e6dec..93ebec1 100644
--- a/airflow/operators/hive_to_samba_operator.py
+++ b/airflow/operators/hive_to_samba_operator.py
@@ -53,7 +53,7 @@ class Hive2SambaOperator(BaseOperator):
samba = SambaHook(samba_conn_id=self.samba_conn_id)
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
tmpfile = tempfile.NamedTemporaryFile()
- self.logger.info("Fetching file from Hive")
+ self.log.info("Fetching file from Hive")
hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name)
- self.logger.info("Pushing to samba")
+ self.log.info("Pushing to samba")
samba.push_from_local(self.destination_filepath, tmpfile.name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d92c931..63b892c 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -74,7 +74,7 @@ class SimpleHttpOperator(BaseOperator):
def execute(self, context):
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
- self.logger.info("Calling HTTP method")
+ self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py
index 942e312..4ec2fa0 100644
--- a/airflow/operators/jdbc_operator.py
+++ b/airflow/operators/jdbc_operator.py
@@ -55,6 +55,6 @@ class JdbcOperator(BaseOperator):
self.autocommit = autocommit
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 58f7e67..a1e2a0c 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -32,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
# If the DAG Run is externally triggered, then return without
# skipping downstream tasks
if context['dag_run'] and context['dag_run'].external_trigger:
- self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.")
+ self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
return
now = datetime.datetime.now()
left_window = context['dag'].following_schedule(
context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
- self.logger.info(
+ self.log.info(
'Checking latest only with left_window: %s right_window: %s now: %s',
left_window, right_window, now
)
if not left_window < now <= right_window:
- self.logger.info('Not latest execution, skipping downstream.')
+ self.log.info('Not latest execution, skipping downstream.')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)
- self.logger.info('Done.')
+ self.log.info('Done.')
else:
- self.logger.info('Latest, allowing execution to proceed.')
+ self.log.info('Latest, allowing execution to proceed.')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py
index bc0822f..3455232 100644
--- a/airflow/operators/mssql_operator.py
+++ b/airflow/operators/mssql_operator.py
@@ -44,7 +44,7 @@ class MsSqlOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
schema=self.database)
hook.run(self.sql, autocommit=self.autocommit,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 719ddd2..c2c858d 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -102,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
- self.logger.info("Dumping Microsoft SQL Server query results to local file")
+ self.log.info("Dumping Microsoft SQL Server query results to local file")
conn = mssql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -118,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- self.logger.info("Loading file into Hive")
+ self.log.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py
index 923eaf8..20f1b7e 100644
--- a/airflow/operators/mysql_operator.py
+++ b/airflow/operators/mysql_operator.py
@@ -46,7 +46,7 @@ class MySqlOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
hook.run(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index fde92b5..cd472a8 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -110,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
- self.logger.info("Dumping MySQL query results to local file")
+ self.log.info("Dumping MySQL query results to local file")
conn = mysql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
@@ -123,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator):
f.flush()
cursor.close()
conn.close()
- self.logger.info("Loading file into Hive")
+ self.log.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/oracle_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py
index f87bbf9..9a35267 100644
--- a/airflow/operators/oracle_operator.py
+++ b/airflow/operators/oracle_operator.py
@@ -42,7 +42,7 @@ class OracleOperator(BaseOperator):
self.parameters = parameters
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
hook.run(
self.sql,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/pig_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py
index cdce48a..a4e4e5c 100644
--- a/airflow/operators/pig_operator.py
+++ b/airflow/operators/pig_operator.py
@@ -59,7 +59,7 @@ class PigOperator(BaseOperator):
"(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)
def execute(self, context):
- self.logger.info('Executing: %s', self.pig)
+ self.log.info('Executing: %s', self.pig)
self.hook = self.get_hook()
self.hook.run_cli(pig=self.pig)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/postgres_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py
index 55c1573..c93dc7b 100644
--- a/airflow/operators/postgres_operator.py
+++ b/airflow/operators/postgres_operator.py
@@ -49,7 +49,7 @@ class PostgresOperator(BaseOperator):
self.database = database
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
schema=self.database)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/presto_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py
index 48158ca..d0c323a 100644
--- a/airflow/operators/presto_to_mysql.py
+++ b/airflow/operators/presto_to_mysql.py
@@ -61,14 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator):
def execute(self, context):
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
- self.logger.info("Extracting data from Presto: %s", self.sql)
+ self.log.info("Extracting data from Presto: %s", self.sql)
results = presto.get_records(self.sql)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
- self.logger.info("Running MySQL preoperator")
- self.logger.info(self.mysql_preoperator)
+ self.log.info("Running MySQL preoperator")
+ self.log.info(self.mysql_preoperator)
mysql.run(self.mysql_preoperator)
- self.logger.info("Inserting rows into MySQL")
+ self.log.info("Inserting rows into MySQL")
mysql.insert_rows(table=self.mysql_table, rows=results)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 56837ec..718c88f 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -87,7 +87,7 @@ class PythonOperator(BaseOperator):
self.op_kwargs = context
return_value = self.execute_callable()
- self.logger.info("Done. Returned value was: %s", return_value)
+ self.log.info("Done. Returned value was: %s", return_value)
return return_value
def execute_callable(self):
@@ -115,17 +115,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
"""
def execute(self, context):
branch = super(BranchPythonOperator, self).execute(context)
- self.logger.info("Following branch %s", branch)
- self.logger.info("Marking other directly downstream tasks as skipped")
+ self.log.info("Following branch %s", branch)
+ self.log.info("Marking other directly downstream tasks as skipped")
downstream_tasks = context['task'].downstream_list
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
skip_tasks = [t for t in downstream_tasks if t.task_id != branch]
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks)
- self.logger.info("Done.")
+ self.log.info("Done.")
class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -142,21 +142,21 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
"""
def execute(self, context):
condition = super(ShortCircuitOperator, self).execute(context)
- self.logger.info("Condition result is %s", condition)
+ self.log.info("Condition result is %s", condition)
if condition:
- self.logger.info('Proceeding with downstream tasks...')
+ self.log.info('Proceeding with downstream tasks...')
return
- self.logger.info('Skipping downstream tasks...')
+ self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
- self.logger.debug("Downstream task_ids %s", downstream_tasks)
+ self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
- self.logger.info("Done.")
+ self.log.info("Done.")
class PythonVirtualenvOperator(PythonOperator):
"""
@@ -233,7 +233,7 @@ class PythonVirtualenvOperator(PythonOperator):
# generate filenames
input_filename = os.path.join(tmp_dir, 'script.in')
output_filename = os.path.join(tmp_dir, 'script.out')
- string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
+ string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
script_filename = os.path.join(tmp_dir, 'script.py')
# set up virtualenv
@@ -261,12 +261,12 @@ class PythonVirtualenvOperator(PythonOperator):
def _execute_in_subprocess(self, cmd):
try:
- self.logger.info("Executing cmd\n{}".format(cmd))
+ self.log.info("Executing cmd\n{}".format(cmd))
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if output:
- self.logger.info("Got output\n{}".format(output))
+ self.log.info("Got output\n{}".format(output))
except subprocess.CalledProcessError as e:
- self.logger.info("Got error output\n{}".format(e.output))
+ self.log.info("Got error output\n{}".format(e.output))
raise
def _write_string_args(self, filename):
@@ -294,14 +294,14 @@ class PythonVirtualenvOperator(PythonOperator):
else:
return pickle.load(f)
except ValueError:
- self.logger.error("Error deserializing result. Note that result deserialization "
+ self.log.error("Error deserializing result. Note that result deserialization "
"is not supported across major Python versions.")
raise
def _write_script(self, script_filename):
with open(script_filename, 'w') as f:
python_code = self._generate_python_code()
- self.logger.debug('Writing code to file\n{}'.format(python_code))
+ self.log.debug('Writing code to file\n{}'.format(python_code))
f.write(python_code)
def _generate_virtualenv_cmd(self, tmp_dir):
@@ -323,7 +323,7 @@ class PythonVirtualenvOperator(PythonOperator):
def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename):
# direct path alleviates need to activate
return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename]
-
+
def _generate_python_code(self):
if self.use_dill:
pickling_library = 'dill'
@@ -354,3 +354,5 @@ class PythonVirtualenvOperator(PythonOperator):
python_callable_name=fn.__name__,
pickling_library=pickling_library)
+ self.log.info("Done.")
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index e25d613..683ff9c 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -70,7 +70,7 @@ class RedshiftToS3Transfer(BaseOperator):
a_key, s_key = self.s3.get_credentials()
unload_options = '\n\t\t\t'.join(self.unload_options)
- self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table)
+ self.log.info("Retrieving headers from %s.%s...", self.schema, self.table)
columns_query = """SELECT column_name
FROM information_schema.columns
@@ -99,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator):
""".format(column_names, column_castings, self.schema, self.table,
self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
- self.logger.info('Executing UNLOAD command...')
+ self.log.info('Executing UNLOAD command...')
self.hook.run(unload_query, self.autocommit)
- self.logger.info("UNLOAD command complete...")
+ self.log.info("UNLOAD command complete...")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 5de5127..68c733c 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -74,12 +74,12 @@ class S3FileTransformOperator(BaseOperator):
def execute(self, context):
source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
- self.logger.info("Downloading source S3 file %s", self.source_s3_key)
+ self.log.info("Downloading source S3 file %s", self.source_s3_key)
if not source_s3.check_for_key(self.source_s3_key):
raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
source_s3_key_object = source_s3.get_key(self.source_s3_key)
with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
- self.logger.info(
+ self.log.info(
"Dumping S3 file %s contents to local file %s",
self.source_s3_key, f_source.name
)
@@ -90,20 +90,20 @@ class S3FileTransformOperator(BaseOperator):
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
- self.logger.info("Transform script stdout %s", transform_script_stdoutdata)
+ self.log.info("Transform script stdout %s", transform_script_stdoutdata)
if transform_script_process.returncode > 0:
raise AirflowException("Transform script failed %s", transform_script_stderrdata)
else:
- self.logger.info(
+ self.log.info(
"Transform script successful. Output temporarily located at %s",
f_dest.name
)
- self.logger.info("Uploading transformed file to S3")
+ self.log.info("Uploading transformed file to S3")
f_dest.flush()
dest_s3.load_file(
filename=f_dest.name,
key=self.dest_s3_key,
replace=self.replace
)
- self.logger.info("Upload successful")
+ self.log.info("Upload successful")
dest_s3.connection.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 68fe903..2b4aceb 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator):
# Downloading file from S3
self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
- self.logger.info("Downloading S3 file")
+ self.log.info("Downloading S3 file")
if self.wildcard_match:
if not self.s3.check_for_wildcard_key(self.s3_key):
@@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator):
NamedTemporaryFile(mode="w",
dir=tmp_dir,
suffix=file_ext) as f:
- self.logger.info("Dumping S3 key {0} contents to local file {1}"
- .format(s3_key_object.key, f.name))
+ self.log.info("Dumping S3 key {0} contents to local file {1}"
+ .format(s3_key_object.key, f.name))
s3_key_object.get_contents_to_file(f)
f.flush()
self.s3.connection.close()
if not self.headers:
- self.logger.info("Loading file %s into Hive", f.name)
+ self.log.info("Loading file %s into Hive", f.name)
self.hive.load_file(
f.name,
self.hive_table,
@@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator):
else:
# Decompressing file
if self.input_compressed:
- self.logger.info("Uncompressing file %s", f.name)
+ self.log.info("Uncompressing file %s", f.name)
fn_uncompressed = uncompress_file(f.name,
file_ext,
tmp_dir)
- self.logger.info("Uncompressed to %s", fn_uncompressed)
+ self.log.info("Uncompressed to %s", fn_uncompressed)
# uncompressed file available now so deleting
# compressed file to save disk space
f.close()
@@ -178,19 +178,19 @@ class S3ToHiveTransfer(BaseOperator):
# Testing if header matches field_dict
if self.check_headers:
- self.logger.info("Matching file header against field_dict")
+ self.log.info("Matching file header against field_dict")
header_list = self._get_top_row_as_list(fn_uncompressed)
if not self._match_headers(header_list):
raise AirflowException("Header check failed")
# Deleting top header row
- self.logger.info("Removing header from file %s", fn_uncompressed)
+ self.log.info("Removing header from file %s", fn_uncompressed)
headless_file = (
self._delete_top_row_and_compress(fn_uncompressed,
file_ext,
tmp_dir))
- self.logger.info("Headless file %s", headless_file)
- self.logger.info("Loading file %s into Hive", headless_file)
+ self.log.info("Headless file %s", headless_file)
+ self.log.info("Loading file %s into Hive", headless_file)
self.hive.load_file(headless_file,
self.hive_table,
field_dict=self.field_dict,
@@ -211,7 +211,7 @@ class S3ToHiveTransfer(BaseOperator):
raise AirflowException("Unable to retrieve header row from file")
field_names = self.field_dict.keys()
if len(field_names) != len(header_list):
- self.logger.warning("Headers count mismatch"
+ self.log.warning("Headers count mismatch"
"File headers:\n {header_list}\n"
"Field names: \n {field_names}\n"
"".format(**locals()))
@@ -219,7 +219,7 @@ class S3ToHiveTransfer(BaseOperator):
test_field_match = [h1.lower() == h2.lower()
for h1, h2 in zip(header_list, field_names)]
if not all(test_field_match):
- self.logger.warning("Headers do not match field names"
+ self.log.warning("Headers do not match field names"
"File headers:\n {header_list}\n"
"Field names: \n {field_names}\n"
"".format(**locals()))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index ea301dc..b5c43c2 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -15,7 +15,7 @@
from __future__ import print_function
from future import standard_library
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
from builtins import str
@@ -82,7 +82,7 @@ class BaseSensorOperator(BaseOperator):
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
- self.logger.info("Success criteria met. Exiting.")
+ self.log.info("Success criteria met. Exiting.")
class SqlSensor(BaseSensorOperator):
@@ -108,7 +108,7 @@ class SqlSensor(BaseSensorOperator):
def poke(self, context):
hook = BaseHook.get_connection(self.conn_id).get_hook()
- self.logger.info('Poking: %s', self.sql)
+ self.log.info('Poking: %s', self.sql)
records = hook.get_records(self.sql)
if not records:
return False
@@ -237,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator):
serialized_dttm_filter = ','.join(
[datetime.isoformat() for datetime in dttm_filter])
- self.logger.info(
+ self.log.info(
'Poking for '
'{self.external_dag_id}.'
'{self.external_task_id} on '
@@ -313,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
schema, table, partition = self.parse_partition_name(partition)
- self.logger.info(
+ self.log.info(
'Poking for {schema}.{table}/{partition}'.format(**locals())
)
return self.hook.check_for_named_partition(
@@ -371,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator):
def poke(self, context):
if '.' in self.table:
self.schema, self.table = self.table.split('.')
- self.logger.info(
+ self.log.info(
'Poking for table {self.schema}.{self.table}, '
'partition {self.partition}'.format(**locals()))
if not hasattr(self, 'hook'):
@@ -417,7 +417,7 @@ class HdfsSensor(BaseSensorOperator):
:return: (bool) depending on the matching criteria
"""
if size:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
size *= settings.MEGABYTE
result = [x for x in result if x['length'] >= size]
@@ -435,7 +435,7 @@ class HdfsSensor(BaseSensorOperator):
:return: (list) of dicts which were not removed
"""
if ignore_copying:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
ignored_extentions_regex = re.compile(regex_builder)
log.debug(
@@ -448,20 +448,20 @@ class HdfsSensor(BaseSensorOperator):
def poke(self, context):
sb = self.hook(self.hdfs_conn_id).get_conn()
- self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
try:
# IMOO it's not right here, as there no raise of any kind.
# if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
# it's not correct as the directory exists and sb does not raise any error
# here is a quick fix
result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
- self.logger.debug('HdfsSensor.poke: result is %s', result)
+ self.log.debug('HdfsSensor.poke: result is %s', result)
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
result = self.filter_for_filesize(result, self.file_size)
return bool(result)
except:
e = sys.exc_info()
- self.logger.debug("Caught an exception !: %s", str(e))
+ self.log.debug("Caught an exception !: %s", str(e))
return False
@@ -484,7 +484,7 @@ class WebHdfsSensor(BaseSensorOperator):
def poke(self, context):
from airflow.hooks.webhdfs_hook import WebHDFSHook
c = WebHDFSHook(self.webhdfs_conn_id)
- self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
return c.check_for_path(hdfs_path=self.filepath)
@@ -535,7 +535,7 @@ class S3KeySensor(BaseSensorOperator):
from airflow.hooks.S3_hook import S3Hook
hook = S3Hook(s3_conn_id=self.s3_conn_id)
full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
- self.logger.info('Poking for key : {full_url}'.format(**locals()))
+ self.log.info('Poking for key : {full_url}'.format(**locals()))
if self.wildcard_match:
return hook.check_for_wildcard_key(self.bucket_key,
self.bucket_name)
@@ -577,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator):
self.s3_conn_id = s3_conn_id
def poke(self, context):
- self.logger.info('Poking for prefix : {self.prefix}\n'
+ self.log.info('Poking for prefix : {self.prefix}\n'
'in bucket s3://{self.bucket_name}'.format(**locals()))
from airflow.hooks.S3_hook import S3Hook
hook = S3Hook(s3_conn_id=self.s3_conn_id)
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
self.target_time = target_time
def poke(self, context):
- self.logger.info('Checking if the time (%s) has come', self.target_time)
+ self.log.info('Checking if the time (%s) has come', self.target_time)
return datetime.now().time() > self.target_time
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
dag = context['dag']
target_dttm = dag.following_schedule(context['execution_date'])
target_dttm += self.delta
- self.logger.info('Checking if the time (%s) has come', target_dttm)
+ self.log.info('Checking if the time (%s) has come', target_dttm)
return datetime.now() > target_dttm
@@ -679,7 +679,7 @@ class HttpSensor(BaseSensorOperator):
http_conn_id=http_conn_id)
def poke(self, context):
- self.logger.info('Poking: %s', self.endpoint)
+ self.log.info('Poking: %s', self.endpoint)
try:
response = self.hook.run(self.endpoint,
data=self.request_params,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 4f2d7bc..8b21211 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -67,7 +67,7 @@ class SlackAPIOperator(BaseOperator):
rc = sc.api_call(self.method, **self.api_params)
if not rc['ok']:
msg = "Slack API call failed (%s)".format(rc['error'])
- self.logger.error(msg)
+ self.log.error(msg)
raise AirflowException(msg)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 7c85847..b32837d 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -41,6 +41,6 @@ class SqliteOperator(BaseOperator):
self.parameters = parameters or []
def execute(self, context):
- self.logger.info('Executing: %s', self.sql)
+ self.log.info('Executing: %s', self.sql)
hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
hook.run(self.sql, parameters=self.parameters)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7c1d246..d5c3407 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,9 +25,9 @@ import re
import sys
from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
class AirflowPluginException(Exception):
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index a9687b3..7a169b2 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -24,7 +24,7 @@ from airflow import configuration, LoggingMixin
NEED_KRB181_WORKAROUND = None
-log = LoggingMixin().logger
+log = LoggingMixin().log
def renew_from_kt():
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index cf1eca4..1e5e614 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -27,9 +27,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
from airflow import configuration as conf
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
class DummyStatsLogger(object):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 7794f4a..6a07db2 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -19,7 +19,7 @@ import json
import subprocess
import threading
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow import configuration as conf
from tempfile import mkstemp
@@ -39,7 +39,7 @@ class BaseTaskRunner(LoggingMixin):
"""
# Pass task instance context into log handlers to setup the logger.
self._task_instance = local_task_job.task_instance
- self.set_logger_contexts(self._task_instance)
+ self.set_log_contexts(self._task_instance)
popen_prepend = []
cfg_path = None
@@ -54,7 +54,7 @@ class BaseTaskRunner(LoggingMixin):
# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
if self.run_as_user and (self.run_as_user != getpass.getuser()):
- self.logger.debug("Planning to run as the %s user", self.run_as_user)
+ self.log.debug("Planning to run as the %s user", self.run_as_user)
cfg_dict = conf.as_dict(display_sensitive=True)
cfg_subset = {
'core': cfg_dict.get('core', {}),
@@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin):
line = line.decode('utf-8')
if len(line) == 0:
break
- self.logger.info('Subtask: %s', line.rstrip('\n'))
+ self.log.info('Subtask: %s', line.rstrip('\n'))
def run_command(self, run_with, join_args=False):
"""
@@ -112,7 +112,7 @@ class BaseTaskRunner(LoggingMixin):
"""
cmd = [" ".join(self._command)] if join_args else self._command
full_cmd = run_with + cmd
- self.logger.info('Running: %s', full_cmd)
+ self.log.info('Running: %s', full_cmd)
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py
index b73e258..109b44c 100644
--- a/airflow/task_runner/bash_task_runner.py
+++ b/airflow/task_runner/bash_task_runner.py
@@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner):
def terminate(self):
if self.process and psutil.pid_exists(self.process.pid):
- kill_process_tree(self.logger, self.process.pid)
+ kill_process_tree(self.log, self.process.pid)
def on_finish(self):
super(BashTaskRunner, self).on_finish()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6497fcc..cc64f68 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -27,7 +27,7 @@ from datetime import datetime
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
class SimpleDag(BaseDag):
@@ -205,7 +205,7 @@ def list_py_file_paths(directory, safe_mode=True):
file_paths.append(file_path)
except Exception:
- log = LoggingMixin().logger
+ log = LoggingMixin().log
log.exception("Error while examining %s", f)
return file_paths
@@ -443,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin):
if file_path in new_file_paths:
filtered_processors[file_path] = processor
else:
- self.logger.warning("Stopping processor for %s", file_path)
+ self.log.warning("Stopping processor for %s", file_path)
processor.stop()
self._processors = filtered_processors
@@ -519,7 +519,7 @@ class DagFileProcessorManager(LoggingMixin):
os.symlink(log_directory, latest_log_directory_path)
elif (os.path.isdir(latest_log_directory_path) or
os.path.isfile(latest_log_directory_path)):
- self.logger.warning(
+ self.log.warning(
"%s already exists as a dir/file. Skip creating symlink.",
latest_log_directory_path
)
@@ -558,7 +558,7 @@ class DagFileProcessorManager(LoggingMixin):
for file_path, processor in self._processors.items():
if processor.done:
- self.logger.info("Processor for %s finished", file_path)
+ self.log.info("Processor for %s finished", file_path)
now = datetime.now()
finished_processors[file_path] = processor
self._last_runtime[file_path] = (now -
@@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin):
simple_dags = []
for file_path, processor in finished_processors.items():
if processor.result is None:
- self.logger.warning(
+ self.log.warning(
"Processor for %s exited with return code %s. See %s for details.",
processor.file_path, processor.exit_code, processor.log_file
)
@@ -606,12 +606,12 @@ class DagFileProcessorManager(LoggingMixin):
set(files_paths_at_run_limit))
for file_path, processor in self._processors.items():
- self.logger.debug(
+ self.log.debug(
"File path %s is still being processed (started: %s)",
processor.file_path, processor.start_time.isoformat()
)
- self.logger.debug(
+ self.log.debug(
"Queuing the following files for processing:\n\t%s",
"\n\t".join(files_paths_to_queue)
)
@@ -626,7 +626,7 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._processor_factory(file_path, log_file_path)
processor.start()
- self.logger.info(
+ self.log.info(
"Started a process (PID: %s) to generate tasks for %s - logging into %s",
processor.pid, file_path, log_file_path
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index c7e58e7..ef2560f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -25,9 +25,9 @@ from sqlalchemy import event, exc
from sqlalchemy.pool import Pool
from airflow import settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().logger
+log = LoggingMixin().log
def provide_session(func):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/email.py
----------------------------------------------------------------------
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index f252d55..fadd4d5 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -31,7 +31,7 @@ from email.utils import formatdate
from airflow import configuration
from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
@@ -88,7 +88,7 @@ def send_email_smtp(to, subject, html_content, files=None, dryrun=False, cc=None
def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
- log = LoggingMixin().logger
+ log = LoggingMixin().log
SMTP_HOST = configuration.get('smtp', 'SMTP_HOST')
SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/LoggingMixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/LoggingMixin.py b/airflow/utils/log/LoggingMixin.py
deleted file mode 100644
index 4572d63..0000000
--- a/airflow/utils/log/LoggingMixin.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import logging
-from builtins import object
-
-
-class LoggingMixin(object):
- """
- Convenience super-class to have a logger configured with the class name
- """
-
- @property
- def logger(self):
- try:
- return self._logger
- except AttributeError:
- self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
- return self._logger
-
- def set_logger_contexts(self, task_instance):
- """
- Set the context for all handlers of current logger.
- """
- for handler in self.logger.handlers:
- try:
- handler.set_context(task_instance)
- except AttributeError:
- pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 0bc0b5e..dcdaf6d 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -15,7 +15,7 @@ import os
from airflow import configuration
from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
google_cloud_storage_conn_id=remote_conn_id
)
except:
- self.logger.error(
+ self.log.error(
'Could not create a GoogleCloudStorageHook with connection id '
'"%s". Please make sure that airflow[gcp_api] is installed '
'and the GCS connection exists.', remote_conn_id
@@ -137,7 +137,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
# return error if needed
if return_error:
msg = 'Could not read logs from {}'.format(remote_log_location)
- self.logger.error(msg)
+ self.log.error(msg)
return msg
def gcs_write(self, log, remote_log_location, append=True):
@@ -167,7 +167,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
tmpfile.flush()
self.hook.upload(bkt, blob, tmpfile.name)
except:
- self.logger.error('Could not write logs to %s', remote_log_location)
+ self.log.error('Could not write logs to %s', remote_log_location)
def parse_gcs_url(self, gsurl):
"""