You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2020/01/04 06:16:00 UTC
[jira] [Created] (AIRFLOW-6452) scheduler_job.py - remove excess
sleep/log/duration calls
t oo created AIRFLOW-6452:
-----------------------------
Summary: scheduler_job.py - remove excess sleep/log/duration calls
Key: AIRFLOW-6452
URL: https://issues.apache.org/jira/browse/AIRFLOW-6452
Project: Apache Airflow
Issue Type: Improvement
Components: scheduler
Affects Versions: 1.10.7
Reporter: t oo
remove a lot of these debug calls, wrap some in boolean of loglevel, remove the 2nd sleep and stuff about getting duration/start/end.etc:
self.log.debug("Starting Loop...")
loop_start_time = time.time()
if self.using_sqlite:
self.processor_agent.heartbeat()
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug(
"Waiting for processors to finish since we're using sqlite")
self.processor_agent.wait_until_finished()
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
# Heartbeat the scheduler periodically
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = timezone.utcnow()
loop_end_time = time.time()
loop_duration = loop_end_time - loop_start_time
self.log.debug(
"Ran scheduling loop in %.2f seconds",
loop_duration)
if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)
if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
break
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
.format(sleep_length))
sleep(sleep_length)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)