You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/12/05 06:14:01 UTC
[airflow] branch master updated: Cleanup & improvements around
scheduling (#12815)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new fbb8a4a Cleanup & improvements around scheduling (#12815)
fbb8a4a is described below
commit fbb8a4a151e7880a1ee9b56324d405dc04c62ea8
Author: Xiaodong DENG <xd...@gmail.com>
AuthorDate: Sat Dec 5 07:12:59 2020 +0100
Cleanup & improvements around scheduling (#12815)
* Cleanup & improvement around scheduling
- Remove unneeded code line
- Remove stale docstring
- Fix wrong docstring
- Fix stale doc image link in docstring
- avoid unnecessary loop in DagRun.schedule_tis()
- Minor improvement on DAG.deactivate_stale_dags()
which is invoked inside SchedulerJob
* Revert one change, because we plan to have a dedicated project-wise PR for this issue
* One more fix: dagbag.read_dags_from_db = True in DagFileProcess.process_file() is not needed anymore
---
airflow/jobs/scheduler_job.py | 14 ++++----------
airflow/models/dagrun.py | 19 ++++++++++---------
2 files changed, 14 insertions(+), 19 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index ae71c36..f52d2c3 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -103,7 +103,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
# The process that was launched to process the given .
self._process: Optional[multiprocessing.process.BaseProcess] = None
- # The result of Scheduler.process_file(file_path).
+ # The result of DagFileProcessor.process_file(file_path).
self._result: Optional[Tuple[int, int]] = None
# Whether the process is done running.
self._done = False
@@ -136,7 +136,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
:param result_channel: the connection to use for passing back the result
:type result_channel: multiprocessing.Connection
:param parent_channel: the parent end of the channel to close in the child
- :type result_channel: multiprocessing.Connection
+ :type parent_channel: multiprocessing.Connection
:param file_path: the file to process
:type file_path: str
:param pickle_dags: whether to pickle the DAGs found in the file and
@@ -334,7 +334,7 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
@property
def result(self) -> Optional[Tuple[int, int]]:
"""
- :return: result of running SchedulerJob.process_file()
+ :return: result of running DagFileProcessor.process_file()
:rtype: tuple[int, int] or None
"""
if not self.done:
@@ -613,11 +613,6 @@ class DagFileProcessor(LoggingMixin):
3. For each DAG, see what tasks should run and create appropriate task
instances in the DB.
4. Record any errors importing the file into ORM
- 5. Kill (in ORM) any task instances belonging to the DAGs that haven't
- issued a heartbeat in a while.
-
- Returns a list of serialized_dag dicts that represent the DAGs found in
- the file
:param file_path: the path to the Python file that should be executed
:type file_path: str
@@ -650,7 +645,6 @@ class DagFileProcessor(LoggingMixin):
self.execute_callbacks(dagbag, callback_requests)
# Save individual DAGs in the ORM
- dagbag.read_dags_from_db = True
dagbag.sync_to_db()
if pickle_dags:
@@ -1412,7 +1406,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
break
if self.processor_agent.done:
self.log.info(
- "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d "
+ "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d"
" scheduler loops",
self.num_times_parse_dags,
loop_count,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 98af00b..307b726 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -718,19 +718,20 @@ class DagRun(Base, LoggingMixin):
All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
is the caller's responsibility to call this function only with TIs from a single dag run.
"""
- # Get list of TIs that do not need to executed, these are
+ # Get list of TI IDs that do not need to executed, these are
# tasks using DummyOperator and without on_execute_callback / on_success_callback
- dummy_tis = {
- ti
- for ti in schedulable_tis
+ dummy_ti_ids = []
+ schedulable_ti_ids = []
+ for ti in schedulable_tis:
if (
ti.task.inherits_from_dummy_operator
and not ti.task.on_execute_callback
and not ti.task.on_success_callback
- )
- }
+ ):
+ dummy_ti_ids.append(ti.task_id)
+ else:
+ schedulable_ti_ids.append(ti.task_id)
- schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis]
count = 0
if schedulable_ti_ids:
@@ -745,13 +746,13 @@ class DagRun(Base, LoggingMixin):
)
# Tasks using DummyOperator should not be executed, mark them as success
- if dummy_tis:
+ if dummy_ti_ids:
count += (
session.query(TI)
.filter(
TI.dag_id == self.dag_id,
TI.execution_date == self.execution_date,
- TI.task_id.in_(ti.task_id for ti in dummy_tis),
+ TI.task_id.in_(dummy_ti_ids),
)
.update(
{