You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/11 09:58:17 UTC

[GitHub] Fokko closed pull request #4408: [AIRFLOW-3589] Visualize reschedule state in all views

Fokko closed pull request #4408: [AIRFLOW-3589] Visualize reschedule state in all views
URL: https://github.com/apache/airflow/pull/4408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8771405c48..e896afe40b 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -939,7 +939,8 @@ def _process_task_instances(self, dag, queue, session=None):
             self.log.debug("Examining active DAG run: %s", run)
             # this needs a fresh session sometimes tis get detached
             tis = run.get_task_instances(state=(State.NONE,
-                                                State.UP_FOR_RETRY))
+                                                State.UP_FOR_RETRY,
+                                                State.UP_FOR_RESCHEDULE))
 
             # this loop is quite slow as it uses are_dependencies_met for
             # every task (in ti.is_runnable). This is also called in
@@ -1585,12 +1586,13 @@ def _execute_helper(self):
                     self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                                               [State.UP_FOR_RETRY],
                                                               State.FAILED)
-                    # If a task instance is scheduled or queued, but the corresponding
-                    # DAG run isn't running, set the state to NONE so we don't try to
-                    # re-run it.
+                    # If a task instance is scheduled or queued or up for reschedule,
+                    # but the corresponding DAG run isn't running, set the state to
+                    # NONE so we don't try to re-run it.
                     self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                                               [State.QUEUED,
-                                                               State.SCHEDULED],
+                                                               State.SCHEDULED,
+                                                               State.UP_FOR_RESCHEDULE],
                                                               State.NONE)
 
                     self._execute_task_instances(simple_dag_bag,
@@ -1948,6 +1950,11 @@ def _update_counters(self, ti_status):
                 self.log.warning("Task instance %s is up for retry", ti)
                 ti_status.running.pop(key)
                 ti_status.to_run[key] = ti
+            # special case: if the task needs to be rescheduled put it back
+            elif ti.state == State.UP_FOR_RESCHEDULE:
+                self.log.warning("Task instance %s is up for reschedule", ti)
+                ti_status.running.pop(key)
+                ti_status.to_run[key] = ti
             # special case: The state of the task can be set to NONE by the task itself
             # when it reaches concurrency limits. It could also happen when the state
             # is changed externally, e.g. by clearing tasks from the ui. We need to cover
@@ -2227,7 +2234,7 @@ def _process_backfill_task_instances(self,
                             session=session,
                             verbose=self.verbose):
                         ti.refresh_from_db(lock_for_update=True, session=session)
-                        if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
+                        if ti.state in (State.SCHEDULED, State.UP_FOR_RETRY, State.UP_FOR_RESCHEDULE):
                             if executor.has_task(ti):
                                 self.log.debug(
                                     "Task Instance %s already in executor "
@@ -2276,6 +2283,16 @@ def _process_backfill_task_instances(self,
                         ti_status.to_run[key] = ti
                         continue
 
+                    # special case
+                    if ti.state == State.UP_FOR_RESCHEDULE:
+                        self.log.debug(
+                            "Task instance %s reschedule period not "
+                            "expired yet", ti)
+                        if key in ti_status.running:
+                            ti_status.running.pop(key)
+                        ti_status.to_run[key] = ti
+                        continue
+
                     # all remaining tasks
                     self.log.debug('Adding %s to not_ready', ti)
                     ti_status.not_ready.add(key)
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index b2561e2fdb..a8573063ad 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -1502,7 +1502,7 @@ def _handle_reschedule(self, reschedule_exception, test_mode=False, context=None
                     reschedule_exception.reschedule_date))
 
         # set state
-        self.state = State.NONE
+        self.state = State.UP_FOR_RESCHEDULE
 
         # Decrement try_number so subsequent runs will use the same try number and write
         # to same log file.
@@ -1510,7 +1510,7 @@ def _handle_reschedule(self, reschedule_exception, test_mode=False, context=None
 
         session.merge(self)
         session.commit()
-        self.log.info('Rescheduling task, marking task as NONE')
+        self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
 
     @provide_session
     def handle_failure(self, error, test_mode=False, context=None, session=None):
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index 60d4118d84..cd3c272078 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -95,6 +95,7 @@ def __init__(
     State.SKIPPED,
     State.UPSTREAM_FAILED,
     State.UP_FOR_RETRY,
+    State.UP_FOR_RESCHEDULE,
 }
 
 # Context to get the dependencies that need to be met in order for a task instance to
diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py b/airflow/ti_deps/deps/ready_to_reschedule.py
index e0f5f8fdfe..dc1c92c654 100644
--- a/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -27,6 +27,7 @@ class ReadyToRescheduleDep(BaseTIDep):
     NAME = "Ready To Reschedule"
     IGNOREABLE = True
     IS_TASK_DEP = True
+    RESCHEDULEABLE_STATES = {State.UP_FOR_RESCHEDULE, State.NONE}
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
@@ -43,9 +44,9 @@ def _get_dep_statuses(self, ti, session, dep_context):
                        "permitted.")
             return
 
-        if ti.state != State.NONE:
+        if ti.state not in self.RESCHEDULEABLE_STATES:
             yield self._passing_status(
-                reason="The task instance is not in NONE state.")
+                reason="The task instance is not in State_UP_FOR_RESCHEDULE or NONE state.")
             return
 
         # Lazy import to avoid circular dependency
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index b73aaa12a0..320b996d5d 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -43,6 +43,7 @@ class State(object):
     SHUTDOWN = "shutdown"  # External request to shut down
     FAILED = "failed"
     UP_FOR_RETRY = "up_for_retry"
+    UP_FOR_RESCHEDULE = "up_for_reschedule"
     UPSTREAM_FAILED = "upstream_failed"
     SKIPPED = "skipped"
 
@@ -53,6 +54,7 @@ class State(object):
         UPSTREAM_FAILED,
         SKIPPED,
         UP_FOR_RETRY,
+        UP_FOR_RESCHEDULE,
         QUEUED,
         NONE,
         SCHEDULED,
@@ -71,6 +73,7 @@ class State(object):
         SHUTDOWN: 'blue',
         FAILED: 'red',
         UP_FOR_RETRY: 'gold',
+        UP_FOR_RESCHEDULE: 'turquoise',
         UPSTREAM_FAILED: 'orange',
         SKIPPED: 'pink',
         REMOVED: 'lightgrey',
@@ -114,5 +117,6 @@ def unfinished(cls):
             cls.QUEUED,
             cls.RUNNING,
             cls.SHUTDOWN,
-            cls.UP_FOR_RETRY
+            cls.UP_FOR_RETRY,
+            cls.UP_FOR_RESCHEDULE
         ]
diff --git a/airflow/www/static/graph.css b/airflow/www/static/graph.css
index f1d3480061..a40abf3ed7 100644
--- a/airflow/www/static/graph.css
+++ b/airflow/www/static/graph.css
@@ -31,11 +31,12 @@ g.node.success rect {
 g.node.up_for_retry rect {
     stroke: gold;
 }
-
 g.node.queued rect {
     stroke: grey;
 }
-
+g.node.up_for_reschedule rect{
+    stroke: turquoise;
+}
 g.node.running rect{
     stroke: lime;
 }
diff --git a/airflow/www/static/main.css b/airflow/www/static/main.css
index 147695c4a9..66f449acee 100644
--- a/airflow/www/static/main.css
+++ b/airflow/www/static/main.css
@@ -75,6 +75,9 @@ span.success{
 span.up_for_retry{
     background-color: gold;
 }
+span.up_for_reschedule{
+    background-color: turquoise;
+}
 span.started{
     background-color: lime;
 }
diff --git a/airflow/www/static/tree.css b/airflow/www/static/tree.css
index 9304bb1c55..17ff748b59 100644
--- a/airflow/www/static/tree.css
+++ b/airflow/www/static/tree.css
@@ -62,6 +62,9 @@ rect.upstream_failed {
 rect.up_for_retry {
     fill: gold;
 }
+rect.up_for_reschedule {
+    fill: turquoise;
+}
 rect.skipped {
     fill: pink;
 }
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 5b3c0b7919..59049789b3 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -402,7 +402,7 @@ <h2>DAGs</h2>
             states = json[dag_id];
             g = d3.select('svg#task-run-' + dag_id)
               .attr('height', diameter + (stroke_width_hover * 2))
-              .attr('width', '270px')
+              .attr('width', '300px')
               .selectAll("g")
               .data(states)
               .enter()
diff --git a/airflow/www/templates/airflow/graph.html b/airflow/www/templates/airflow/graph.html
index 1fede359a6..ac25f4efc9 100644
--- a/airflow/www/templates/airflow/graph.html
+++ b/airflow/www/templates/airflow/graph.html
@@ -66,6 +66,7 @@
     <div class="legend_item state" style="border-color:white;">no status</div>
     <div class="legend_item state" style="border-color:grey;">queued</div>
     <div class="legend_item state" style="border-color:gold;">retry</div>
+    <div class="legend_item state" style="border-color:turquoise;">rescheduled</div>
     <div class="legend_item state" style="border-color:pink;">skipped</div>
     <div class="legend_item state" style="border-color:red;">failed</div>
     <div class="legend_item state" style="border-color:lime;">running</div>
diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html
index abdc337c83..483d0ad98e 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -47,6 +47,8 @@
     <div class="square" style="background: grey;"></div>
     <div class="legend_item" style="border: none;">retry</div>
     <div class="square" style="background: gold;"></div>
+    <div class="legend_item" style="border: none;">rescheduled</div>
+    <div class="square" style="background: turquoise;"></div>
     <div class="legend_item" style="border: none;">skipped</div>
     <div class="square" style="background: pink;"></div>
     <div class="legend_item" style="border: none;">failed</div>
diff --git a/airflow/www_rbac/static/css/graph.css b/airflow/www_rbac/static/css/graph.css
index 6edca96169..12de132d38 100644
--- a/airflow/www_rbac/static/css/graph.css
+++ b/airflow/www_rbac/static/css/graph.css
@@ -54,11 +54,12 @@ g.node.success rect {
 g.node.up_for_retry rect {
     stroke: gold;
 }
-
+g.node.up_for_reschedule rect{
+    stroke: turquoise;
+}
 g.node.queued rect {
     stroke: grey;
 }
-
 g.node.running rect{
     stroke: lime;
 }
diff --git a/airflow/www_rbac/static/css/main.css b/airflow/www_rbac/static/css/main.css
index d3d198356e..bc9bb25daa 100644
--- a/airflow/www_rbac/static/css/main.css
+++ b/airflow/www_rbac/static/css/main.css
@@ -78,6 +78,9 @@ span.success{
 span.up_for_retry{
     background-color: gold;
 }
+span.up_for_reschedule{
+    background-color: turquoise;
+}
 span.started{
     background-color: lime;
 }
diff --git a/airflow/www_rbac/static/css/tree.css b/airflow/www_rbac/static/css/tree.css
index 9304bb1c55..17ff748b59 100644
--- a/airflow/www_rbac/static/css/tree.css
+++ b/airflow/www_rbac/static/css/tree.css
@@ -62,6 +62,9 @@ rect.upstream_failed {
 rect.up_for_retry {
     fill: gold;
 }
+rect.up_for_reschedule {
+    fill: turquoise;
+}
 rect.skipped {
     fill: pink;
 }
diff --git a/airflow/www_rbac/templates/airflow/dags.html b/airflow/www_rbac/templates/airflow/dags.html
index 64883ba0ed..07253d5bdf 100644
--- a/airflow/www_rbac/templates/airflow/dags.html
+++ b/airflow/www_rbac/templates/airflow/dags.html
@@ -403,7 +403,7 @@ <h2>DAGs</h2>
             states = json[dag_id];
             g = d3.select('svg#task-run-' + dag_id)
               .attr('height', diameter + (stroke_width_hover * 2))
-              .attr('width', '270px')
+              .attr('width', '300px')
               .selectAll("g")
               .data(states)
               .enter()
diff --git a/airflow/www_rbac/templates/airflow/graph.html b/airflow/www_rbac/templates/airflow/graph.html
index c73d2cbea7..3b8d4914cc 100644
--- a/airflow/www_rbac/templates/airflow/graph.html
+++ b/airflow/www_rbac/templates/airflow/graph.html
@@ -62,6 +62,7 @@
     <div class="legend_item state" style="border-color:white;">no status</div>
     <div class="legend_item state" style="border-color:grey;">queued</div>
     <div class="legend_item state" style="border-color:gold;">retry</div>
+    <div class="legend_item state" style="border-color:turquoise;">rescheduled</div>
     <div class="legend_item state" style="border-color:pink;">skipped</div>
     <div class="legend_item state" style="border-color:red;">failed</div>
     <div class="legend_item state" style="border-color:lime;">running</div>
diff --git a/airflow/www_rbac/templates/airflow/tree.html b/airflow/www_rbac/templates/airflow/tree.html
index 0fe09c1b56..e0fb57f56e 100644
--- a/airflow/www_rbac/templates/airflow/tree.html
+++ b/airflow/www_rbac/templates/airflow/tree.html
@@ -48,6 +48,8 @@
     <div class="square" style="background: grey;"></div>
     <div class="legend_item" style="border: none;">retry</div>
     <div class="square" style="background: gold;"></div>
+    <div class="legend_item" style="border: none;">rescheduled</div>
+    <div class="square" style="background: turquoise;"></div>
     <div class="legend_item" style="border: none;">skipped</div>
     <div class="square" style="background: pink;"></div>
     <div class="legend_item" style="border: none;">failed</div>
diff --git a/tests/sensors/test_base_sensor.py b/tests/sensors/test_base_sensor.py
index 353f4447b1..f9186db149 100644
--- a/tests/sensors/test_base_sensor.py
+++ b/tests/sensors/test_base_sensor.py
@@ -182,7 +182,7 @@ def test_ok_with_reschedule(self):
         for ti in tis:
             if ti.task_id == SENSOR_OP:
                 # verify task is re-scheduled, i.e. state set to NONE
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify one row in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 1)
@@ -201,7 +201,7 @@ def test_ok_with_reschedule(self):
         for ti in tis:
             if ti.task_id == SENSOR_OP:
                 # verify task is re-scheduled, i.e. state set to NONE
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify two rows in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 2)
@@ -239,7 +239,7 @@ def test_fail_with_reschedule(self):
         self.assertEquals(len(tis), 2)
         for ti in tis:
             if ti.task_id == SENSOR_OP:
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
             if ti.task_id == DUMMY_OP:
                 self.assertEquals(ti.state, State.NONE)
 
@@ -273,7 +273,7 @@ def test_soft_fail_with_reschedule(self):
         self.assertEquals(len(tis), 2)
         for ti in tis:
             if ti.task_id == SENSOR_OP:
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
             if ti.task_id == DUMMY_OP:
                 self.assertEquals(ti.state, State.NONE)
 
@@ -305,7 +305,7 @@ def test_ok_with_reschedule_and_retry(self):
         self.assertEquals(len(tis), 2)
         for ti in tis:
             if ti.task_id == SENSOR_OP:
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify one row in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 1)
@@ -337,7 +337,7 @@ def test_ok_with_reschedule_and_retry(self):
         self.assertEquals(len(tis), 2)
         for ti in tis:
             if ti.task_id == SENSOR_OP:
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify one row in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 1)
@@ -393,7 +393,7 @@ def test_ok_with_custom_reschedule_exception(self):
         for ti in tis:
             if ti.task_id == SENSOR_OP:
                 # verify task is re-scheduled, i.e. state set to NONE
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify one row in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 1)
@@ -410,7 +410,7 @@ def test_ok_with_custom_reschedule_exception(self):
         for ti in tis:
             if ti.task_id == SENSOR_OP:
                 # verify task is re-scheduled, i.e. state set to NONE
-                self.assertEquals(ti.state, State.NONE)
+                self.assertEquals(ti.state, State.UP_FOR_RESCHEDULE)
                 # verify two rows in task_reschedule table
                 task_reschedules = TaskReschedule.find_for_task_instance(ti)
                 self.assertEquals(len(task_reschedules), 2)
diff --git a/tests/test_jobs.py b/tests/test_jobs.py
index 9d67753c35..6119f54e08 100644
--- a/tests/test_jobs.py
+++ b/tests/test_jobs.py
@@ -37,6 +37,7 @@
 import six
 import sqlalchemy
 from mock import Mock, patch, MagicMock, PropertyMock
+from parameterized import parameterized
 
 from airflow.utils.db import create_session
 from airflow import AirflowException, settings, models
@@ -258,6 +259,46 @@ def test_backfill_conf(self):
 
         self.assertEqual(conf, dr[0].conf)
 
+    def test_backfill_run_rescheduled(self):
+        dag = DAG(
+            dag_id='test_backfill_run_rescheduled',
+            start_date=DEFAULT_DATE,
+            schedule_interval='@daily')
+
+        with dag:
+            DummyOperator(
+                task_id='test_backfill_run_rescheduled_task-1',
+                dag=dag,
+            )
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          )
+        job.run()
+
+        ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'),
+                execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        ti.set_state(State.UP_FOR_RESCHEDULE)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          rerun_failed_tasks=True
+                          )
+        job.run()
+        ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'),
+                execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        self.assertEquals(ti.state, State.SUCCESS)
+
     def test_backfill_rerun_failed_tasks(self):
         dag = DAG(
             dag_id='test_backfill_rerun_failed',
@@ -1056,8 +1097,31 @@ def test_update_counters(self):
 
         ti_status.failed.clear()
 
+        # test for retry
+        ti.set_state(State.UP_FOR_RETRY, session)
+        ti_status.running[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.running) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 0)
+        self.assertTrue(len(ti_status.skipped) == 0)
+        self.assertTrue(len(ti_status.failed) == 0)
+        self.assertTrue(len(ti_status.to_run) == 1)
+
+        ti_status.to_run.clear()
+
         # test for reschedule
-        # test for failed
+        ti.set_state(State.UP_FOR_RESCHEDULE, session)
+        ti_status.running[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.running) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 0)
+        self.assertTrue(len(ti_status.skipped) == 0)
+        self.assertTrue(len(ti_status.failed) == 0)
+        self.assertTrue(len(ti_status.to_run) == 1)
+
+        ti_status.to_run.clear()
+
+        # test for none
         ti.set_state(State.NONE, session)
         ti_status.running[ti.key] = ti
         job._update_counters(ti_status=ti_status)
@@ -1067,6 +1131,8 @@ def test_update_counters(self):
         self.assertTrue(len(ti_status.failed) == 0)
         self.assertTrue(len(ti_status.to_run) == 1)
 
+        ti_status.to_run.clear()
+
         session.close()
 
     def test_dag_get_run_dates(self):
@@ -2123,6 +2189,51 @@ def test_execute_helper_reset_orphaned_tasks(self):
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
         self.assertEqual(ti2.state, State.SCHEDULED)
 
+    @parameterized.expand([
+        [State.UP_FOR_RETRY, State.FAILED],
+        [State.QUEUED, State.NONE],
+        [State.SCHEDULED, State.NONE],
+        [State.UP_FOR_RESCHEDULE, State.NONE],
+    ])
+    def test_execute_helper_should_change_state_for_tis_without_dagrun(
+            self, initial_task_state, expected_task_state):
+        session = settings.Session()
+        dag = DAG(
+            'test_execute_helper_should_change_state_for_tis_without_dagrun',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+                               state=State.FAILED,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = initial_task_state
+        session.commit()
+
+        # Create scheduler and mock calls to processor. Run duration is set
+        # to a high value to ensure loop is entered. Poll interval is 0 to
+        # avoid sleep. Done flag is set to true to exist the loop immediately.
+        scheduler = SchedulerJob(num_runs=0, processor_poll_interval=0)
+        executor = TestExecutor()
+        executor.queued_tasks
+        scheduler.executor = executor
+        processor = mock.MagicMock()
+        processor.harvest_simple_dags.return_value = [dag]
+        processor.done = True
+        scheduler.processor_agent = processor
+
+        scheduler._execute_helper()
+
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        self.assertEqual(ti.state, expected_task_state)
+
     @provide_session
     def evaluate_dagrun(
             self,
@@ -2363,7 +2474,14 @@ def test_scheduler_dagrun_once(self):
         dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
-    def test_scheduler_process_task_instances(self):
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+            timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+            timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_scheduler_process_task_instances(self, state, start_date, end_date):
         """
         Test if _process_task_instances puts the right task instances into the
         queue.
@@ -2376,17 +2494,22 @@ def test_scheduler_process_task_instances(self):
             dag=dag,
             owner='airflow')
 
-        session = settings.Session()
-        orm_dag = DagModel(dag_id=dag.dag_id)
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
 
         scheduler = SchedulerJob()
         dag.clear()
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
         queue = Mock()
         scheduler._process_task_instances(dag, queue=queue)
 
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 898850f8b7..9bae349898 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -44,7 +44,7 @@ def _get_task_reschedule(self, reschedule_date):
         return tr
 
     def test_should_pass_if_ignore_in_reschedule_period_is_set(self):
-        ti = self._get_task_instance(State.NONE)
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         dep_context = DepContext(ignore_in_reschedule_period=True)
         self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context))
 
@@ -62,7 +62,7 @@ def test_should_pass_after_reschedule_date_one(self, find_for_task_instance):
         find_for_task_instance.return_value = [
             self._get_task_reschedule(utcnow() - timedelta(minutes=1)),
         ]
-        ti = self._get_task_instance(State.NONE)
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))
 
     @patch('airflow.models.TaskReschedule.find_for_task_instance')
@@ -72,7 +72,7 @@ def test_should_pass_after_reschedule_date_multiple(self, find_for_task_instance
             self._get_task_reschedule(utcnow() - timedelta(minutes=11)),
             self._get_task_reschedule(utcnow() - timedelta(minutes=1)),
         ]
-        ti = self._get_task_instance(State.NONE)
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))
 
     @patch('airflow.models.TaskReschedule.find_for_task_instance')
@@ -80,7 +80,7 @@ def test_should_fail_before_reschedule_date_one(self, find_for_task_instance):
         find_for_task_instance.return_value = [
             self._get_task_reschedule(utcnow() + timedelta(minutes=1)),
         ]
-        ti = self._get_task_instance(State.NONE)
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         self.assertFalse(ReadyToRescheduleDep().is_met(ti=ti))
 
     @patch('airflow.models.TaskReschedule.find_for_task_instance')
@@ -90,5 +90,5 @@ def test_should_fail_before_reschedule_date_multiple(self, find_for_task_instanc
             self._get_task_reschedule(utcnow() - timedelta(minutes=9)),
             self._get_task_reschedule(utcnow() + timedelta(minutes=1)),
         ]
-        ti = self._get_task_instance(State.NONE)
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         self.assertFalse(ReadyToRescheduleDep().is_met(ti=ti))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services