You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/12/05 06:14:01 UTC

[airflow] branch master updated: Cleanup & improvements around scheduling (#12815)

This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new fbb8a4a  Cleanup & improvements around scheduling (#12815)
fbb8a4a is described below

commit fbb8a4a151e7880a1ee9b56324d405dc04c62ea8
Author: Xiaodong DENG <xd...@gmail.com>
AuthorDate: Sat Dec 5 07:12:59 2020 +0100

    Cleanup & improvements around scheduling (#12815)
    
    * Cleanup & improvement around scheduling
    
    - Remove unneeded code line
    - Remove stale docstring
    - Fix wrong docstring
    - Fix stale doc image link in docstring
    - avoid unnecessary loop in DagRun.schedule_tis()
    - Minor improvement on DAG.deactivate_stale_dags()
      which is invoked inside SchedulerJob
    
    * Revert one change, because we plan to have a dedicated project-wise PR for this issue
    
    * One more fix: dagbag.read_dags_from_db = True in DagFileProcess.process_file() is not needed anymore
---
 airflow/jobs/scheduler_job.py | 14 ++++----------
 airflow/models/dagrun.py      | 19 ++++++++++---------
 2 files changed, 14 insertions(+), 19 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index ae71c36..f52d2c3 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -103,7 +103,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
 
         # The process that was launched to process the given .
         self._process: Optional[multiprocessing.process.BaseProcess] = None
-        # The result of Scheduler.process_file(file_path).
+        # The result of DagFileProcessor.process_file(file_path).
         self._result: Optional[Tuple[int, int]] = None
         # Whether the process is done running.
         self._done = False
@@ -136,7 +136,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
         :param result_channel: the connection to use for passing back the result
         :type result_channel: multiprocessing.Connection
         :param parent_channel: the parent end of the channel to close in the child
-        :type result_channel: multiprocessing.Connection
+        :type parent_channel: multiprocessing.Connection
         :param file_path: the file to process
         :type file_path: str
         :param pickle_dags: whether to pickle the DAGs found in the file and
@@ -334,7 +334,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
     @property
     def result(self) -> Optional[Tuple[int, int]]:
         """
-        :return: result of running SchedulerJob.process_file()
+        :return: result of running DagFileProcessor.process_file()
         :rtype: tuple[int, int] or None
         """
         if not self.done:
@@ -613,11 +613,6 @@ class DagFileProcessor(LoggingMixin):
         3. For each DAG, see what tasks should run and create appropriate task
         instances in the DB.
         4. Record any errors importing the file into ORM
-        5. Kill (in ORM) any task instances belonging to the DAGs that haven't
-        issued a heartbeat in a while.
-
-        Returns a list of serialized_dag dicts that represent the DAGs found in
-        the file
 
         :param file_path: the path to the Python file that should be executed
         :type file_path: str
@@ -650,7 +645,6 @@ class DagFileProcessor(LoggingMixin):
         self.execute_callbacks(dagbag, callback_requests)
 
         # Save individual DAGs in the ORM
-        dagbag.read_dags_from_db = True
         dagbag.sync_to_db()
 
         if pickle_dags:
@@ -1412,7 +1406,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 break
             if self.processor_agent.done:
                 self.log.info(
-                    "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d "
+                    "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d"
                     " scheduler loops",
                     self.num_times_parse_dags,
                     loop_count,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 98af00b..307b726 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -718,19 +718,20 @@ class DagRun(Base, LoggingMixin):
         All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
         is the caller's responsibility to call this function only with TIs from a single dag run.
         """
-        # Get list of TIs that do not need to executed, these are
+        # Get list of TI IDs that do not need to executed, these are
         # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = {
-            ti
-            for ti in schedulable_tis
+        dummy_ti_ids = []
+        schedulable_ti_ids = []
+        for ti in schedulable_tis:
             if (
                 ti.task.inherits_from_dummy_operator
                 and not ti.task.on_execute_callback
                 and not ti.task.on_success_callback
-            )
-        }
+            ):
+                dummy_ti_ids.append(ti.task_id)
+            else:
+                schedulable_ti_ids.append(ti.task_id)
 
-        schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis]
         count = 0
 
         if schedulable_ti_ids:
@@ -745,13 +746,13 @@ class DagRun(Base, LoggingMixin):
             )
 
         # Tasks using DummyOperator should not be executed, mark them as success
-        if dummy_tis:
+        if dummy_ti_ids:
             count += (
                 session.query(TI)
                 .filter(
                     TI.dag_id == self.dag_id,
                     TI.execution_date == self.execution_date,
-                    TI.task_id.in_(ti.task_id for ti in dummy_tis),
+                    TI.task_id.in_(dummy_ti_ids),
                 )
                 .update(
                     {