You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:20:20 UTC

[GitHub] stale[bot] closed pull request #2113: [AIRFLOW-920] Allow marking tasks in zoomed in subdags

stale[bot] closed pull request #2113: [AIRFLOW-920] Allow marking tasks in zoomed in subdags
URL: https://github.com/apache/incubator-airflow/pull/2113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py
index 0ddbf987bf..938b856393 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -49,6 +49,72 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
     return drs
 
 
+def _verify_tree(dag, dates, state):
+    """
+    Go through the tree of dags and create any missing dag runs for sub dags
+    :param dag: top of the tree to start looking
+    :param dates: dates for which to create the runs
+    :return: list of confirmed dates for which dag runs existed
+    """
+    root = dag
+    while root.is_subdag:
+        root = root.parent_dag
+
+    confirmed_dates = []
+
+    drs = DagRun.find(dag_id=root.dag_id, execution_date=dates)
+    for dr in drs:
+        dr.dag = root
+        dr.verify_integrity()
+        confirmed_dates.append(dr.execution_date)
+
+    dags = [root]
+    while len(dags) > 0:
+        current_dag = dags.pop(0)
+        for task_id in current_dag.task_ids:
+            task = current_dag.get_task(task_id)
+            if isinstance(task, SubDagOperator):
+                _create_dagruns(task.subdag,
+                                execution_dates=confirmed_dates,
+                                state=state,
+                                run_id_template=BackfillJob.ID_FORMAT_PREFIX)
+                dags.append(task.subdag)
+
+    return confirmed_dates
+
+
+def _walk_subdags(dag, task_ids, execution_dates, state, commit, session):
+    dags = [dag]
+    sub_dag_ids = []
+    while len(dags) > 0:
+        current_dag = dags.pop()
+        for task_id in task_ids:
+            if not current_dag.has_task(task_id):
+                continue
+
+            current_task = current_dag.get_task(task_id)
+            if isinstance(current_task, SubDagOperator):
+                # this works as a kind of integrity check
+                # it creates missing dag runs for subdagoperators,
+                # maybe this should be moved to dagrun.verify_integrity
+                drs = _create_dagruns(current_task.subdag,
+                                      execution_dates=execution_dates,
+                                      state=state,
+                                      run_id_template=BackfillJob.ID_FORMAT_PREFIX)
+
+                for dr in drs:
+                    dr.dag = current_task.subdag
+                    dr.verify_integrity()
+                    if commit:
+                        dr.state = state
+                        session.merge(dr)
+
+                dags.append(current_task.subdag)
+                sub_dag_ids.append(current_task.subdag.dag_id)
+
+    return sub_dag_ids
+
+
 def set_state(task, execution_date, upstream=False, downstream=False,
               future=False, past=False, state=State.SUCCESS, commit=False):
     """
@@ -78,7 +144,6 @@ def set_state(task, execution_date, upstream=False, downstream=False,
     dag = task.dag
 
     latest_execution_date = dag.latest_execution_date
-    assert latest_execution_date is not None
 
     # determine date range of dag runs and tasks to consider
     end_date = latest_execution_date if future else execution_date
@@ -109,44 +174,15 @@ def set_state(task, execution_date, upstream=False, downstream=False,
     # verify the integrity of the dag runs in case a task was added or removed
     # set the confirmed execution dates as they might be different
     # from what was provided
-    confirmed_dates = []
-    drs = DagRun.find(dag_id=dag.dag_id, execution_date=dates)
-    for dr in drs:
-        dr.dag = dag
-        dr.verify_integrity()
-        confirmed_dates.append(dr.execution_date)
+    confirmed_dates = _verify_tree(dag, dates, state=State.RUNNING)
 
     # go through subdagoperators and create dag runs. We will only work
-    # within the scope of the subdag. We wont propagate to the parent dag,
-    # but we will propagate from parent to subdag.
+    # within the scope of the subdag.
     session = Session()
-    dags = [dag]
-    sub_dag_ids = []
-    while len(dags) > 0:
-        current_dag = dags.pop()
-        for task_id in task_ids:
-            if not current_dag.has_task(task_id):
-                continue
 
-            current_task = current_dag.get_task(task_id)
-            if isinstance(current_task, SubDagOperator):
-                # this works as a kind of integrity check
-                # it creates missing dag runs for subdagoperators,
-                # maybe this should be moved to dagrun.verify_integrity
-                drs = _create_dagruns(current_task.subdag,
-                                      execution_dates=confirmed_dates,
-                                      state=State.RUNNING,
-                                      run_id_template=BackfillJob.ID_FORMAT_PREFIX)
-
-                for dr in drs:
-                    dr.dag = current_task.subdag
-                    dr.verify_integrity()
-                    if commit:
-                        dr.state = state
-                        session.merge(dr)
-
-                dags.append(current_task.subdag)
-                sub_dag_ids.append(current_task.subdag.dag_id)
+    sub_dag_ids = _walk_subdags(dag, task_ids,
+                                confirmed_dates, State.RUNNING,
+                                commit, session)
 
     # now look for the task instances that are affected
     TI = TaskInstance


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services