You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/26 18:10:22 UTC

[airflow] 02/02: [AIRFLOW-3607] Optimize dep checking when depends on past set and concurrency limit

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit edae056edd6b6fae94533ca8a4cc220ebfbadc68
Author: amichai07 <am...@bluevine.com>
AuthorDate: Fri Jun 5 13:38:29 2020 +0300

    [AIRFLOW-3607] Optimize dep checking when depends on past set and concurrency limit
---
 airflow/models/dagrun.py | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index a70be6f..61ca6bd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -275,12 +275,16 @@ class DagRun(Base, LoggingMixin):
         # small speed up
         if unfinished_tasks and none_depends_on_past and none_task_concurrency:
             scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
-
-            self.log.debug("number of scheduleable tasks for %s: %s task(s)", self, len(scheduleable_tasks))
+            self.log.debug(
+                "number of scheduleable tasks for %s: %s task(s)",
+                self, len(scheduleable_tasks))
             ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
             self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
-            are_runnable_tasks = ready_tis or self._are_premature_tis(
-                unfinished_tasks, finished_tasks, session) or changed_tis
+            if none_depends_on_past and none_task_concurrency:
+                # small speed up
+                are_runnable_tasks = ready_tis or self._are_premature_tis(
+                    unfinished_tasks, finished_tasks, session) or changed_tis
+
         duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
         Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)