You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2020/01/07 01:23:00 UTC
[jira] [Work started] (AIRFLOW-6452) scheduler_job.py - remove
excess sleep/log/duration calls
[ https://issues.apache.org/jira/browse/AIRFLOW-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AIRFLOW-6452 started by t oo.
-------------------------------------
> scheduler_job.py - remove excess sleep/log/duration calls
> ---------------------------------------------------------
>
> Key: AIRFLOW-6452
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6452
> Project: Apache Airflow
> Issue Type: Improvement
> Components: scheduler
> Affects Versions: 1.10.7
> Reporter: t oo
> Assignee: t oo
> Priority: Minor
>
> remove a lot of these debug calls, wrap some in boolean of loglevel, remove the 2nd sleep and stuff about getting duration/start/end.etc:
> self.log.debug("Starting Loop...")
> loop_start_time = time.time()
> if self.using_sqlite:
> self.processor_agent.heartbeat()
> # For the sqlite case w/ 1 thread, wait until the processor
> # is finished to avoid concurrent access to the DB.
> self.log.debug(
> "Waiting for processors to finish since we're using sqlite")
> self.processor_agent.wait_until_finished()
> self.log.debug("Harvesting DAG parsing results")
> simple_dags = self._get_simple_dags()
> self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
> # Send tasks for execution if available
> simple_dag_bag = SimpleDagBag(simple_dags)
> if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
> continue
> # Heartbeat the scheduler periodically
> time_since_last_heartbeat = (timezone.utcnow() -
> last_self_heartbeat_time).total_seconds()
> if time_since_last_heartbeat > self.heartrate:
> self.log.debug("Heartbeating the scheduler")
> self.heartbeat()
> last_self_heartbeat_time = timezone.utcnow()
> loop_end_time = time.time()
> loop_duration = loop_end_time - loop_start_time
> self.log.debug(
> "Ran scheduling loop in %.2f seconds",
> loop_duration)
> if not is_unit_test:
> self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
> time.sleep(self._processor_poll_interval)
> if self.processor_agent.done:
> self.log.info("Exiting scheduler loop as all files"
> " have been processed {} times".format(self.num_runs))
> break
> if loop_duration < 1 and not is_unit_test:
> sleep_length = 1 - loop_duration
> self.log.debug(
> "Sleeping for {0:.2f} seconds to prevent excessive logging"
> .format(sleep_length))
> sleep(sleep_length)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)