You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2023/07/06 09:08:01 UTC

[airflow] branch fix_prev_dagrun_dep_for_dynamic_tasks created (now 91a48c0ce0)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a change to branch fix_prev_dagrun_dep_for_dynamic_tasks
in repository https://gitbox.apache.org/repos/asf/airflow.git


      at 91a48c0ce0 Rewrite list-building into iterator

This branch includes the following new commits:

     new 91a48c0ce0 Rewrite list-building into iterator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[airflow] 01/01: Rewrite list-building into iterator

Posted by ur...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch fix_prev_dagrun_dep_for_dynamic_tasks
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 91a48c0ce0131bd68566995addb417cc6cdd057a
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Thu Jul 6 17:07:11 2023 +0800

    Rewrite list-building into iterator
---
 airflow/ti_deps/deps/prev_dagrun_dep.py | 34 ++++++++++++++++-----------------
 1 file changed, 16 insertions(+), 18 deletions(-)

diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py
index d8f359e40a..9365db2de8 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -112,11 +112,8 @@ class PrevDagrunDep(BaseTIDep):
             )
             return
 
-        unfinished_previous_tis = [
-            ti
-            for ti in previous_tis
-            if ti.state not in {TaskInstanceState.SKIPPED, TaskInstanceState.SUCCESS}
-        ]
+        finished_states = {TaskInstanceState.SKIPPED, TaskInstanceState.SUCCESS}
+        unfinished_previous_tis = [ti for ti in previous_tis if ti.state not in finished_states]
         if unfinished_previous_tis:
             ti_str = ",".join(str(ti) for ti in unfinished_previous_tis)
             reason = (
@@ -126,19 +123,20 @@ class PrevDagrunDep(BaseTIDep):
             yield self._failing_status(reason=reason)
             return
 
-        previous_tis_dependents_not_done = []
-        for previous_ti in previous_tis:
-            previous_ti.task = ti.task
-            if not previous_ti.are_dependents_done(session=session):
-                previous_tis_dependents_not_done.append(previous_ti)
+        if ti.task.wait_for_downstream:
 
-        if ti.task.wait_for_downstream and previous_tis_dependents_not_done:
-            ti_str = ",".join(str(ti) for ti in previous_tis_dependents_not_done)
-            yield self._failing_status(
-                reason=(
-                    f"The tasks downstream of the previous task instance(s) {ti_str} haven't completed "
-                    f"(and wait_for_downstream is True)."
+            def _are_depdendencies_done(prev_ti: TI) -> bool:
+                prev_ti.task = ti.task
+                return prev_ti.are_dependents_done(session=session)
+
+            previous_tis_dependents_not_done = [ti for ti in previous_tis if not _are_depdendencies_done(ti)]
+            if previous_tis_dependents_not_done:
+                ti_str = ",".join(str(ti) for ti in previous_tis_dependents_not_done)
+                yield self._failing_status(
+                    reason=(
+                        f"The tasks downstream of the previous task instance(s) {ti_str} haven't completed "
+                        f"(and wait_for_downstream is True)."
+                    )
                 )
-            )
-            return
+                return
         self._push_past_deps_met_xcom_if_needed(ti, dep_context)