You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2020/01/04 06:16:00 UTC

[jira] [Created] (AIRFLOW-6452) scheduler_job.py - remove excess sleep/log/duration calls

t oo created AIRFLOW-6452:
-----------------------------

             Summary: scheduler_job.py - remove excess sleep/log/duration calls
                 Key: AIRFLOW-6452
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6452
             Project: Apache Airflow
          Issue Type: Improvement
          Components: scheduler
    Affects Versions: 1.10.7
            Reporter: t oo


remove a lot of these debug calls, wrap some in boolean of loglevel, remove the 2nd sleep and stuff about getting duration/start/end.etc:


self.log.debug("Starting Loop...")
            loop_start_time = time.time()

            if self.using_sqlite:
                self.processor_agent.heartbeat()
                # For the sqlite case w/ 1 thread, wait until the processor
                # is finished to avoid concurrent access to the DB.
                self.log.debug(
                    "Waiting for processors to finish since we're using sqlite")
                self.processor_agent.wait_until_finished()

            self.log.debug("Harvesting DAG parsing results")
            simple_dags = self._get_simple_dags()
            self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))

            # Send tasks for execution if available
            simple_dag_bag = SimpleDagBag(simple_dags)

            if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
                continue

            # Heartbeat the scheduler periodically
            time_since_last_heartbeat = (timezone.utcnow() -
                                         last_self_heartbeat_time).total_seconds()
            if time_since_last_heartbeat > self.heartrate:
                self.log.debug("Heartbeating the scheduler")
                self.heartbeat()
                last_self_heartbeat_time = timezone.utcnow()

            loop_end_time = time.time()
            loop_duration = loop_end_time - loop_start_time
            self.log.debug(
                "Ran scheduling loop in %.2f seconds",
                loop_duration)

            if not is_unit_test:
                self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
                time.sleep(self._processor_poll_interval)

            if self.processor_agent.done:
                self.log.info("Exiting scheduler loop as all files"
                              " have been processed {} times".format(self.num_runs))
                break

            if loop_duration < 1 and not is_unit_test:
                sleep_length = 1 - loop_duration
                self.log.debug(
                    "Sleeping for {0:.2f} seconds to prevent excessive logging"
                    .format(sleep_length))
                sleep(sleep_length)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)