You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2021/07/06 09:04:25 UTC
[airflow] branch main updated: Remove
AbstractDagFileProcessorProcess from dag processing (#16816)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 81fde58 Remove AbstractDagFileProcessorProcess from dag processing (#16816)
81fde58 is described below
commit 81fde5844de37e90917deaaff9576914cb2637ee
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 6 10:04:02 2021 +0100
Remove AbstractDagFileProcessorProcess from dag processing (#16816)
This change removes AbstractDagFileProcessorProcess from dag processing
---
airflow/dag_processing/manager.py | 102 ++++--------------------------------
airflow/dag_processing/processor.py | 3 +-
2 files changed, 12 insertions(+), 93 deletions(-)
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 5af49e8..ba264be 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -26,12 +26,11 @@ import random
import signal
import sys
import time
-from abc import ABCMeta, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta
from importlib import import_module
from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Union, cast
from setproctitle import setproctitle
from sqlalchemy import or_
@@ -39,6 +38,7 @@ from tabulate import tabulate
import airflow.models
from airflow.configuration import conf
+from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
@@ -58,86 +58,6 @@ if TYPE_CHECKING:
import pathlib
-class AbstractDagFileProcessorProcess(metaclass=ABCMeta):
- """Processes a DAG file. See SchedulerJob.process_file() for more details."""
-
- @abstractmethod
- def start(self) -> None:
- """Launch the process to process the file"""
- raise NotImplementedError()
-
- @abstractmethod
- def terminate(self, sigkill: bool = False):
- """Terminate (and then kill) the process launched to process the file"""
- raise NotImplementedError()
-
- @abstractmethod
- def kill(self) -> None:
- """Kill the process launched to process the file, and ensure consistent state."""
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def pid(self) -> int:
- """:return: the PID of the process launched to process the given file"""
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def exit_code(self) -> Optional[int]:
- """
- After the process is finished, this can be called to get the return code
- :return: the exit code of the process
- :rtype: int
- """
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def done(self) -> bool:
- """
- Check if the process launched to process this file is done.
- :return: whether the process is finished running
- :rtype: bool
- """
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def result(self) -> Optional[Tuple[int, int]]:
- """
- A list of simple dags found, and the number of import errors
-
- :return: result of running SchedulerJob.process_file() if available. Otherwise, none
- :rtype: Optional[Tuple[int, int]]
- """
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def start_time(self) -> datetime:
- """
- :return: When this started to process the file
- :rtype: datetime
- """
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def file_path(self) -> str:
- """
- :return: the path to the file that this is processing
- :rtype: unicode
- """
- raise NotImplementedError()
-
- @property
- @abstractmethod
- def waitable_handle(self):
- """A "waitable" handle that can be passed to ``multiprocessing.connection.wait()``"""
- raise NotImplementedError()
-
-
class DagParsingStat(NamedTuple):
"""Information on processing progress"""
@@ -181,7 +101,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
:param processor_factory: function that creates processors for DAG
definition files. Arguments are (dag_definition_path, log_file_path)
:type processor_factory: ([str, List[CallbackRequest], Optional[List[str]], bool]) -> (
- AbstractDagFileProcessorProcess
+ DagFileProcessorProcess
)
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
@@ -198,7 +118,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
dag_directory: str,
max_runs: int,
processor_factory: Callable[
- [str, List[CallbackRequest], Optional[List[str]], bool], AbstractDagFileProcessorProcess
+ [str, List[CallbackRequest], Optional[List[str]], bool], DagFileProcessorProcess
],
processor_timeout: timedelta,
dag_ids: Optional[List[str]],
@@ -215,7 +135,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
self._pickle_dags = pickle_dags
self._async_mode = async_mode
# Map from file path to the processor
- self._processors: Dict[str, AbstractDagFileProcessorProcess] = {}
+ self._processors: Dict[str, DagFileProcessorProcess] = {}
# Pipe for communicating signals
self._process: Optional[multiprocessing.process.BaseProcess] = None
self._done: bool = False
@@ -330,7 +250,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
def _run_processor_manager(
dag_directory: str,
max_runs: int,
- processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess],
+ processor_factory: Callable[[str, List[CallbackRequest]], DagFileProcessorProcess],
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: Optional[List[str]],
@@ -479,7 +399,7 @@ class DagFileProcessorManager(LoggingMixin):
:type max_runs: int
:param processor_factory: function that creates processors for DAG
definition files. Arguments are (dag_definition_path)
- :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessorProcess)
+ :type processor_factory: (unicode, unicode, list) -> (DagFileProcessorProcess)
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
:param signal_conn: connection to communicate signal with processor agent.
@@ -496,7 +416,7 @@ class DagFileProcessorManager(LoggingMixin):
self,
dag_directory: Union[str, "pathlib.Path"],
max_runs: int,
- processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess],
+ processor_factory: Callable[[str, List[CallbackRequest]], DagFileProcessorProcess],
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: Optional[List[str]],
@@ -544,7 +464,7 @@ class DagFileProcessorManager(LoggingMixin):
# Should store dag file source in a database?
self.store_dag_code = STORE_DAG_CODE
# Map from file path to the processor
- self._processors: Dict[str, AbstractDagFileProcessorProcess] = {}
+ self._processors: Dict[str, DagFileProcessorProcess] = {}
self._num_run = 0
@@ -569,7 +489,7 @@ class DagFileProcessorManager(LoggingMixin):
self._log = logging.getLogger('airflow.processor_manager')
- self.waitables: Dict[Any, Union[MultiprocessingConnection, AbstractDagFileProcessorProcess]] = {
+ self.waitables: Dict[Any, Union[MultiprocessingConnection, DagFileProcessorProcess]] = {
self._signal_conn: self._signal_conn,
}
@@ -1003,7 +923,7 @@ class DagFileProcessorManager(LoggingMixin):
for sentinel in ready:
if sentinel is self._signal_conn:
continue
- processor = cast(AbstractDagFileProcessorProcess, self.waitables[sentinel])
+ processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
self.waitables.pop(processor.waitable_handle)
self._processors.pop(processor.file_path)
self._collect_results_from_processor(processor)
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index 65e7d54..4a24c1a 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -32,7 +32,6 @@ from sqlalchemy.orm.session import Session
from airflow import models, settings
from airflow.configuration import conf
-from airflow.dag_processing.manager import AbstractDagFileProcessorProcess
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models import DAG, DagModel, SlaMiss, errors
from airflow.models.dagbag import DagBag
@@ -53,7 +52,7 @@ from airflow.utils.state import State
TI = models.TaskInstance
-class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, MultiprocessingStartMethodMixin):
+class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
"""Runs DAG processing in a separate process using DagFileProcessor
:param file_path: a Python file containing Airflow DAG definitions