You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 10:48:24 UTC
[airflow] 04/05: [AIRFLOW-6897] Simplify DagFileProcessorManager
(#7521)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 38e27948289860e53e51b60b74bd2d0310a4076a
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Mon Feb 24 16:15:38 2020 +0100
[AIRFLOW-6897] Simplify DagFileProcessorManager (#7521)
(cherry picked from commit 83d826b9925ce0eb2bd1fe403f5151fbef310b63)
---
airflow/utils/dag_processing.py | 129 ++++++++++++++++++++--------------------
1 file changed, 63 insertions(+), 66 deletions(-)
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 3aac8fd..e4ecc29 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -763,7 +763,7 @@ class DagFileProcessorManager(LoggingMixin):
# Map from file path to the processor
self._processors = {}
- self._heartbeat_count = 0
+ self._num_run = 0
# Map from file path to stats about the file
self._file_stats = {} # type: dict(str, DagFileStat)
@@ -843,11 +843,24 @@ class DagFileProcessorManager(LoggingMixin):
# are told to (as that would open another connection to the
# SQLite DB which isn't a good practice
continue
-
+ # pylint: enable=no-else-break
self._refresh_dag_dir()
- self._find_zombies()
+ self._find_zombies() # pylint: disable=no-value-for-parameter
+
+ self._kill_timed_out_processors()
+ simple_dags = self.collect_results()
+
+ # Generate more file paths to process if we processed all the files
+ # already.
+ if not self._file_path_queue:
+ self.emit_metrics()
+ self.prepare_file_path_queue()
+
+ self.start_new_processes()
+
+ # Update number of loop iteration.
+ self._num_run += 1
- simple_dags = self.heartbeat()
for simple_dag in simple_dags:
self._signal_conn.send(simple_dag)
@@ -1197,65 +1210,11 @@ class DagFileProcessorManager(LoggingMixin):
return simple_dags
- def heartbeat(self):
- """
- This should be periodically called by the manager loop. This method will
- kick off new processes to process DAG definition files and read the
- results from the finished processors.
-
- :return: a list of SimpleDags that were produced by processors that
- have finished since the last time this was called
- :rtype: list[airflow.utils.dag_processing.SimpleDag]
+ def start_new_processes(self):
+ """"
+ Start more processors if we have enough slots and files to process
"""
- simple_dags = self.collect_results()
-
- # Generate more file paths to process if we processed all the files
- # already.
- if len(self._file_path_queue) == 0:
- self.emit_metrics()
-
- self._parsing_start_time = timezone.utcnow()
- # If the file path is already being processed, or if a file was
- # processed recently, wait until the next batch
- file_paths_in_progress = self._processors.keys()
- now = timezone.utcnow()
- file_paths_recently_processed = []
- for file_path in self._file_paths:
- last_finish_time = self.get_last_finish_time(file_path)
- if (last_finish_time is not None and
- (now - last_finish_time).total_seconds() <
- self._file_process_interval):
- file_paths_recently_processed.append(file_path)
-
- files_paths_at_run_limit = [file_path
- for file_path, stat in self._file_stats.items()
- if stat.run_count == self._max_runs]
-
- files_paths_to_queue = list(set(self._file_paths) -
- set(file_paths_in_progress) -
- set(file_paths_recently_processed) -
- set(files_paths_at_run_limit))
-
- for file_path, processor in self._processors.items():
- self.log.debug(
- "File path %s is still being processed (started: %s)",
- processor.file_path, processor.start_time.isoformat()
- )
-
- self.log.debug(
- "Queuing the following files for processing:\n\t%s",
- "\n\t".join(files_paths_to_queue)
- )
-
- for file_path in files_paths_to_queue:
- if file_path not in self._file_stats:
- self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
-
- self._file_path_queue.extend(files_paths_to_queue)
-
- # Start more processors if we have enough slots and files to process
- while (self._parallelism - len(self._processors) > 0 and
- len(self._file_path_queue) > 0):
+ while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, self._zombies)
Stats.incr('dag_processing.processes')
@@ -1267,10 +1226,48 @@ class DagFileProcessorManager(LoggingMixin):
)
self._processors[file_path] = processor
- # Update heartbeat count.
- self._heartbeat_count += 1
+ def prepare_file_path_queue(self):
+ """
+ Generate more file paths to process. Result are saved in _file_path_queue.
+ """
+ self._parsing_start_time = timezone.utcnow()
+ # If the file path is already being processed, or if a file was
+ # processed recently, wait until the next batch
+ file_paths_in_progress = self._processors.keys()
+ now = timezone.utcnow()
+ file_paths_recently_processed = []
+ for file_path in self._file_paths:
+ last_finish_time = self.get_last_finish_time(file_path)
+ if (last_finish_time is not None and
+ (now - last_finish_time).total_seconds() <
+ self._file_process_interval):
+ file_paths_recently_processed.append(file_path)
+
+ files_paths_at_run_limit = [file_path
+ for file_path, stat in self._file_stats.items()
+ if stat.run_count == self._max_runs]
+
+ files_paths_to_queue = list(set(self._file_paths) -
+ set(file_paths_in_progress) -
+ set(file_paths_recently_processed) -
+ set(files_paths_at_run_limit))
- return simple_dags
+ for file_path, processor in self._processors.items():
+ self.log.debug(
+ "File path %s is still being processed (started: %s)",
+ processor.file_path, processor.start_time.isoformat()
+ )
+
+ self.log.debug(
+ "Queuing the following files for processing:\n\t%s",
+ "\n\t".join(files_paths_to_queue)
+ )
+
+ for file_path in files_paths_to_queue:
+ if file_path not in self._file_stats:
+ self._file_stats[file_path] = DagFileStat(0, 0, None, None, 0)
+
+ self._file_path_queue.extend(files_paths_to_queue)
@provide_session
def _find_zombies(self, session):
@@ -1338,7 +1335,7 @@ class DagFileProcessorManager(LoggingMixin):
for stat in self._file_stats.values():
if stat.run_count < self._max_runs:
return False
- if self._heartbeat_count < self._max_runs:
+ if self._num_run < self._max_runs:
return False
return True