You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Tomasz Urbaszek (Jira)" <ji...@apache.org> on 2020/01/09 12:32:00 UTC

[jira] [Resolved] (AIRFLOW-6452) scheduler_job.py - remove excess sleep/log/duration calls

     [ https://issues.apache.org/jira/browse/AIRFLOW-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tomasz Urbaszek resolved AIRFLOW-6452.
--------------------------------------
    Fix Version/s: 1.10.8
       Resolution: Done

> scheduler_job.py - remove excess sleep/log/duration calls
> ---------------------------------------------------------
>
>                 Key: AIRFLOW-6452
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6452
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Assignee: t oo
>            Priority: Minor
>             Fix For: 1.10.8
>
>
> remove a lot of these debug calls, wrap some in boolean of loglevel, remove the 2nd sleep and stuff about getting duration/start/end.etc:
> self.log.debug("Starting Loop...")
>             loop_start_time = time.time()
>             if self.using_sqlite:
>                 self.processor_agent.heartbeat()
>                 # For the sqlite case w/ 1 thread, wait until the processor
>                 # is finished to avoid concurrent access to the DB.
>                 self.log.debug(
>                     "Waiting for processors to finish since we're using sqlite")
>                 self.processor_agent.wait_until_finished()
>             self.log.debug("Harvesting DAG parsing results")
>             simple_dags = self._get_simple_dags()
>             self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
>             # Send tasks for execution if available
>             simple_dag_bag = SimpleDagBag(simple_dags)
>             if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
>                 continue
>             # Heartbeat the scheduler periodically
>             time_since_last_heartbeat = (timezone.utcnow() -
>                                          last_self_heartbeat_time).total_seconds()
>             if time_since_last_heartbeat > self.heartrate:
>                 self.log.debug("Heartbeating the scheduler")
>                 self.heartbeat()
>                 last_self_heartbeat_time = timezone.utcnow()
>             loop_end_time = time.time()
>             loop_duration = loop_end_time - loop_start_time
>             self.log.debug(
>                 "Ran scheduling loop in %.2f seconds",
>                 loop_duration)
>             if not is_unit_test:
>                 self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
>                 time.sleep(self._processor_poll_interval)
>             if self.processor_agent.done:
>                 self.log.info("Exiting scheduler loop as all files"
>                               " have been processed {} times".format(self.num_runs))
>                 break
>             if loop_duration < 1 and not is_unit_test:
>                 sleep_length = 1 - loop_duration
>                 self.log.debug(
>                     "Sleeping for {0:.2f} seconds to prevent excessive logging"
>                     .format(sleep_length))
>                 sleep(sleep_length)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)