You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2018/01/29 20:02:22 UTC

incubator-airflow git commit: [AIRFLOW-2023] Add debug logging around number of queued files

Repository: incubator-airflow
Updated Branches:
  refs/heads/master da0e628fa -> 61ff29e57


[AIRFLOW-2023] Add debug logging around number of queued files

Add debug logging around number of queued files to
process in the
scheduler. This makes it easy to see when there
are bottlenecks due to parallelism and how long it
takes for all files to be processed.

Closes #2968 from aoen/ddavydov--
add_more_scheduler_metrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61ff29e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61ff29e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61ff29e5

Branch: refs/heads/master
Commit: 61ff29e578d1121ab4606fe122fb4e2db8f075b9
Parents: da0e628
Author: Dan Davydov <da...@airbnb.com>
Authored: Mon Jan 29 12:01:58 2018 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon Jan 29 12:02:02 2018 -0800

----------------------------------------------------------------------
 airflow/utils/dag_processing.py | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61ff29e5/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index dc0c7a6..b4abd34 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -476,6 +476,12 @@ class DagFileProcessorManager(LoggingMixin):
                 running_processors[file_path] = processor
         self._processors = running_processors
 
+        self.log.debug("%s/%s scheduler processes running",
+                       len(self._processors), self._parallelism)
+
+        self.log.debug("%s file paths queued for processing",
+                       len(self._file_path_queue))
+
         # Collect all the DAGs that were found in the processed files
         simple_dags = []
         for file_path, processor in finished_processors.items():