You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2021/07/06 09:04:25 UTC

[airflow] branch main updated: Remove AbstractDagFileProcessorProcess from dag processing (#16816)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 81fde58  Remove AbstractDagFileProcessorProcess from dag processing (#16816)
81fde58 is described below

commit 81fde5844de37e90917deaaff9576914cb2637ee
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 6 10:04:02 2021 +0100

    Remove AbstractDagFileProcessorProcess from dag processing (#16816)
    
    This change removes AbstractDagFileProcessorProcess from dag processing
---
 airflow/dag_processing/manager.py   | 102 ++++--------------------------------
 airflow/dag_processing/processor.py |   3 +-
 2 files changed, 12 insertions(+), 93 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 5af49e8..ba264be 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -26,12 +26,11 @@ import random
 import signal
 import sys
 import time
-from abc import ABCMeta, abstractmethod
 from collections import defaultdict
 from datetime import datetime, timedelta
 from importlib import import_module
 from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Union, cast
 
 from setproctitle import setproctitle
 from sqlalchemy import or_
@@ -39,6 +38,7 @@ from tabulate import tabulate
 
 import airflow.models
 from airflow.configuration import conf
+from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.models import DagModel, errors
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -58,86 +58,6 @@ if TYPE_CHECKING:
     import pathlib
 
 
-class AbstractDagFileProcessorProcess(metaclass=ABCMeta):
-    """Processes a DAG file. See SchedulerJob.process_file() for more details."""
-
-    @abstractmethod
-    def start(self) -> None:
-        """Launch the process to process the file"""
-        raise NotImplementedError()
-
-    @abstractmethod
-    def terminate(self, sigkill: bool = False):
-        """Terminate (and then kill) the process launched to process the file"""
-        raise NotImplementedError()
-
-    @abstractmethod
-    def kill(self) -> None:
-        """Kill the process launched to process the file, and ensure consistent state."""
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def pid(self) -> int:
-        """:return: the PID of the process launched to process the given file"""
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def exit_code(self) -> Optional[int]:
-        """
-        After the process is finished, this can be called to get the return code
-        :return: the exit code of the process
-        :rtype: int
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def done(self) -> bool:
-        """
-        Check if the process launched to process this file is done.
-        :return: whether the process is finished running
-        :rtype: bool
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def result(self) -> Optional[Tuple[int, int]]:
-        """
-        A list of simple dags found, and the number of import errors
-
-        :return: result of running SchedulerJob.process_file() if available. Otherwise, none
-        :rtype: Optional[Tuple[int, int]]
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def start_time(self) -> datetime:
-        """
-        :return: When this started to process the file
-        :rtype: datetime
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def file_path(self) -> str:
-        """
-        :return: the path to the file that this is processing
-        :rtype: unicode
-        """
-        raise NotImplementedError()
-
-    @property
-    @abstractmethod
-    def waitable_handle(self):
-        """A "waitable" handle that can be passed to ``multiprocessing.connection.wait()``"""
-        raise NotImplementedError()
-
-
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
@@ -181,7 +101,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
     :param processor_factory: function that creates processors for DAG
         definition files. Arguments are (dag_definition_path, log_file_path)
     :type processor_factory: ([str, List[CallbackRequest], Optional[List[str]], bool]) -> (
-        AbstractDagFileProcessorProcess
+        DagFileProcessorProcess
     )
     :param processor_timeout: How long to wait before timing out a DAG file processor
     :type processor_timeout: timedelta
@@ -198,7 +118,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
         dag_directory: str,
         max_runs: int,
         processor_factory: Callable[
-            [str, List[CallbackRequest], Optional[List[str]], bool], AbstractDagFileProcessorProcess
+            [str, List[CallbackRequest], Optional[List[str]], bool], DagFileProcessorProcess
         ],
         processor_timeout: timedelta,
         dag_ids: Optional[List[str]],
@@ -215,7 +135,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
         self._pickle_dags = pickle_dags
         self._async_mode = async_mode
         # Map from file path to the processor
-        self._processors: Dict[str, AbstractDagFileProcessorProcess] = {}
+        self._processors: Dict[str, DagFileProcessorProcess] = {}
         # Pipe for communicating signals
         self._process: Optional[multiprocessing.process.BaseProcess] = None
         self._done: bool = False
@@ -330,7 +250,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
     def _run_processor_manager(
         dag_directory: str,
         max_runs: int,
-        processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess],
+        processor_factory: Callable[[str, List[CallbackRequest]], DagFileProcessorProcess],
         processor_timeout: timedelta,
         signal_conn: MultiprocessingConnection,
         dag_ids: Optional[List[str]],
@@ -479,7 +399,7 @@ class DagFileProcessorManager(LoggingMixin):
     :type max_runs: int
     :param processor_factory: function that creates processors for DAG
         definition files. Arguments are (dag_definition_path)
-    :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessorProcess)
+    :type processor_factory: (unicode, unicode, list) -> (DagFileProcessorProcess)
     :param processor_timeout: How long to wait before timing out a DAG file processor
     :type processor_timeout: timedelta
     :param signal_conn: connection to communicate signal with processor agent.
@@ -496,7 +416,7 @@ class DagFileProcessorManager(LoggingMixin):
         self,
         dag_directory: Union[str, "pathlib.Path"],
         max_runs: int,
-        processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess],
+        processor_factory: Callable[[str, List[CallbackRequest]], DagFileProcessorProcess],
         processor_timeout: timedelta,
         signal_conn: MultiprocessingConnection,
         dag_ids: Optional[List[str]],
@@ -544,7 +464,7 @@ class DagFileProcessorManager(LoggingMixin):
         # Should store dag file source in a database?
         self.store_dag_code = STORE_DAG_CODE
         # Map from file path to the processor
-        self._processors: Dict[str, AbstractDagFileProcessorProcess] = {}
+        self._processors: Dict[str, DagFileProcessorProcess] = {}
 
         self._num_run = 0
 
@@ -569,7 +489,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         self._log = logging.getLogger('airflow.processor_manager')
 
-        self.waitables: Dict[Any, Union[MultiprocessingConnection, AbstractDagFileProcessorProcess]] = {
+        self.waitables: Dict[Any, Union[MultiprocessingConnection, DagFileProcessorProcess]] = {
             self._signal_conn: self._signal_conn,
         }
 
@@ -1003,7 +923,7 @@ class DagFileProcessorManager(LoggingMixin):
         for sentinel in ready:
             if sentinel is self._signal_conn:
                 continue
-            processor = cast(AbstractDagFileProcessorProcess, self.waitables[sentinel])
+            processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
             self.waitables.pop(processor.waitable_handle)
             self._processors.pop(processor.file_path)
             self._collect_results_from_processor(processor)
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index 65e7d54..4a24c1a 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -32,7 +32,6 @@ from sqlalchemy.orm.session import Session
 
 from airflow import models, settings
 from airflow.configuration import conf
-from airflow.dag_processing.manager import AbstractDagFileProcessorProcess
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models import DAG, DagModel, SlaMiss, errors
 from airflow.models.dagbag import DagBag
@@ -53,7 +52,7 @@ from airflow.utils.state import State
 TI = models.TaskInstance
 
 
-class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, MultiprocessingStartMethodMixin):
+class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
     """Runs DAG processing in a separate process using DagFileProcessor
 
     :param file_path: a Python file containing Airflow DAG definitions