You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/13 20:22:21 UTC

[5/7] incubator-airflow git commit: [AIRFLOW-747] Fix retry_delay not honoured

[AIRFLOW-747] Fix retry_delay not honoured

Dag runs were marked deadlocked although a task was
still up for retry and in its retry_delay period. Next to
that _execute_task_instances was picking up tasks in
UP_FOR_RETRY state directly from the database, while
tasks that pass the dependency check will be set to scheduled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/68f484cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/68f484cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/68f484cd

Branch: refs/heads/v1-8-test
Commit: 68f484cd6073351ccd01d60279a982cfa272cfa5
Parents: e0f5c0c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jan 13 12:25:26 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 13 20:52:07 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py                                 | 11 +++++++++--
 airflow/models.py                               | 10 ++++++++--
 airflow/ti_deps/dep_context.py                  |  4 ++++
 airflow/ti_deps/deps/not_in_retry_period_dep.py |  6 ++++++
 tests/jobs.py                                   | 10 ++++++++--
 5 files changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 819d107..6906625 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -875,6 +875,7 @@ class SchedulerJob(BaseJob):
             .query(models.TaskInstance)
             .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
             .filter(models.TaskInstance.state.in_(old_states))
+            .with_for_update()
             .all()
         )
         """:type: list[TaskInstance]"""
@@ -1050,6 +1051,13 @@ class SchedulerJob(BaseJob):
                                  .format(task_instance.key, priority, queue))
 
                 # Set the state to queued
+                task_instance.refresh_from_db(lock_for_update=True, session=session)
+                if task_instance.state not in states:
+                    self.logger.info("Task {} was set to {} outside this scheduler."
+                                     .format(task_instance.key, task_instance.state))
+                    session.commit()
+                    continue
+
                 self.logger.info("Setting state of {} to {}".format(
                     task_instance.key, State.QUEUED))
                 task_instance.state = State.QUEUED
@@ -1393,8 +1401,7 @@ class SchedulerJob(BaseJob):
                                                           State.NONE)
 
                 self._execute_task_instances(simple_dag_bag,
-                                             (State.SCHEDULED,
-                                              State.UP_FOR_RETRY))
+                                             (State.SCHEDULED,))
 
             # Call hearbeats
             self.logger.info("Heartbeating the executor")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0bd744e..85abdfe 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3693,6 +3693,7 @@ class DagRun(Base):
 
     ID_PREFIX = 'scheduled__'
     ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
+    DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True)
 
     id = Column(Integer, primary_key=True)
     dag_id = Column(String(ID_LEN))
@@ -3879,8 +3880,13 @@ class DagRun(Base):
         # small speed up
         if unfinished_tasks and none_depends_on_past:
             # todo: this can actually get pretty slow: one task costs between 0.01-015s
-            no_dependencies_met = all(not t.are_dependencies_met(session=session)
-                                      for t in unfinished_tasks)
+            no_dependencies_met = all(
+                # Use a special dependency context that ignores task's up for retry
+                # dependency, since a task that is up for retry is not necessarily
+                # deadlocked.
+                not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT,
+                                           session=session)
+                for t in unfinished_tasks)
 
         duration = (datetime.now() - start_dttm).total_seconds() * 1000
         Stats.timing("dagrun.dependency-check.{}.{}".

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/dep_context.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index 583099d..01e01dd 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -49,6 +49,8 @@ class DepContext(object):
     :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for
         Backfills)
     :type ignore_depends_on_past: boolean
+    :param ignore_in_retry_period: Ignore the retry period for task instances
+    :type ignore_in_retry_period: boolean
     :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and
         trigger rule
     :type ignore_task_deps: boolean
@@ -61,12 +63,14 @@ class DepContext(object):
             flag_upstream_failed=False,
             ignore_all_deps=False,
             ignore_depends_on_past=False,
+            ignore_in_retry_period=False,
             ignore_task_deps=False,
             ignore_ti_state=False):
         self.deps = deps or set()
         self.flag_upstream_failed = flag_upstream_failed
         self.ignore_all_deps = ignore_all_deps
         self.ignore_depends_on_past = ignore_depends_on_past
+        self.ignore_in_retry_period = ignore_in_retry_period
         self.ignore_task_deps = ignore_task_deps
         self.ignore_ti_state = ignore_ti_state
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 8305094..05dceac 100644
--- a/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -25,6 +25,12 @@ class NotInRetryPeriodDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
+        if dep_context.ignore_in_retry_period:
+            yield self._passing_status(
+                reason="The context specified that being in a retry period was "
+                       "permitted.")
+            raise StopIteration
+
         if ti.state != State.UP_FOR_RETRY:
             yield self._passing_status(
                 reason="The task instance was not marked for retrying.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 32c615d..3d70415 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -913,7 +913,8 @@ class SchedulerJobTest(unittest.TestCase):
 
         dag = DAG(
             dag_id='test_retry_still_in_executor',
-            start_date=DEFAULT_DATE)
+            start_date=DEFAULT_DATE,
+            schedule_interval="@once")
         dag_task1 = BashOperator(
             task_id='test_retry_handling_op',
             bash_command='exit 1',
@@ -963,11 +964,16 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.UP_FOR_RETRY)
         self.assertEqual(ti.try_number, 1)
 
+        ti.refresh_from_db(lock_for_update=True, session=session)
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        session.commit()
+
         # do not schedule
         do_schedule()
         self.assertTrue(executor.has_task(ti))
         ti.refresh_from_db()
-        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+        self.assertEqual(ti.state, State.SCHEDULED)
 
         # now the executor has cleared and it should be allowed the re-queue
         executor.queued_tasks.clear()