You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/13 20:22:21 UTC
[5/7] incubator-airflow git commit: [AIRFLOW-747] Fix retry_delay not
honoured
[AIRFLOW-747] Fix retry_delay not honoured
Dag runs were marked deadlocked although a task was
still up for retry and in its retry_delay period. Next to
that _execute_task_instances was picking up tasks in
UP_FOR_RETRY state directly from the database, while
tasks that pass the dependency check will be set to scheduled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/68f484cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/68f484cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/68f484cd
Branch: refs/heads/v1-8-test
Commit: 68f484cd6073351ccd01d60279a982cfa272cfa5
Parents: e0f5c0c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jan 13 12:25:26 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 13 20:52:07 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 11 +++++++++--
airflow/models.py | 10 ++++++++--
airflow/ti_deps/dep_context.py | 4 ++++
airflow/ti_deps/deps/not_in_retry_period_dep.py | 6 ++++++
tests/jobs.py | 10 ++++++++--
5 files changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 819d107..6906625 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -875,6 +875,7 @@ class SchedulerJob(BaseJob):
.query(models.TaskInstance)
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
.filter(models.TaskInstance.state.in_(old_states))
+ .with_for_update()
.all()
)
""":type: list[TaskInstance]"""
@@ -1050,6 +1051,13 @@ class SchedulerJob(BaseJob):
.format(task_instance.key, priority, queue))
# Set the state to queued
+ task_instance.refresh_from_db(lock_for_update=True, session=session)
+ if task_instance.state not in states:
+ self.logger.info("Task {} was set to {} outside this scheduler."
+ .format(task_instance.key, task_instance.state))
+ session.commit()
+ continue
+
self.logger.info("Setting state of {} to {}".format(
task_instance.key, State.QUEUED))
task_instance.state = State.QUEUED
@@ -1393,8 +1401,7 @@ class SchedulerJob(BaseJob):
State.NONE)
self._execute_task_instances(simple_dag_bag,
- (State.SCHEDULED,
- State.UP_FOR_RETRY))
+ (State.SCHEDULED,))
# Call hearbeats
self.logger.info("Heartbeating the executor")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0bd744e..85abdfe 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3693,6 +3693,7 @@ class DagRun(Base):
ID_PREFIX = 'scheduled__'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
+ DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True)
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
@@ -3879,8 +3880,13 @@ class DagRun(Base):
# small speed up
if unfinished_tasks and none_depends_on_past:
# todo: this can actually get pretty slow: one task costs between 0.01-015s
- no_dependencies_met = all(not t.are_dependencies_met(session=session)
- for t in unfinished_tasks)
+ no_dependencies_met = all(
+ # Use a special dependency context that ignores task's up for retry
+ # dependency, since a task that is up for retry is not necessarily
+ # deadlocked.
+ not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT,
+ session=session)
+ for t in unfinished_tasks)
duration = (datetime.now() - start_dttm).total_seconds() * 1000
Stats.timing("dagrun.dependency-check.{}.{}".
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/dep_context.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index 583099d..01e01dd 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -49,6 +49,8 @@ class DepContext(object):
:param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for
Backfills)
:type ignore_depends_on_past: boolean
+ :param ignore_in_retry_period: Ignore the retry period for task instances
+ :type ignore_in_retry_period: boolean
:param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and
trigger rule
:type ignore_task_deps: boolean
@@ -61,12 +63,14 @@ class DepContext(object):
flag_upstream_failed=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
+ ignore_in_retry_period=False,
ignore_task_deps=False,
ignore_ti_state=False):
self.deps = deps or set()
self.flag_upstream_failed = flag_upstream_failed
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
+ self.ignore_in_retry_period = ignore_in_retry_period
self.ignore_task_deps = ignore_task_deps
self.ignore_ti_state = ignore_ti_state
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 8305094..05dceac 100644
--- a/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -25,6 +25,12 @@ class NotInRetryPeriodDep(BaseTIDep):
@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
+ if dep_context.ignore_in_retry_period:
+ yield self._passing_status(
+ reason="The context specified that being in a retry period was "
+ "permitted.")
+ raise StopIteration
+
if ti.state != State.UP_FOR_RETRY:
yield self._passing_status(
reason="The task instance was not marked for retrying.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 32c615d..3d70415 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -913,7 +913,8 @@ class SchedulerJobTest(unittest.TestCase):
dag = DAG(
dag_id='test_retry_still_in_executor',
- start_date=DEFAULT_DATE)
+ start_date=DEFAULT_DATE,
+ schedule_interval="@once")
dag_task1 = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
@@ -963,11 +964,16 @@ class SchedulerJobTest(unittest.TestCase):
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)
+ ti.refresh_from_db(lock_for_update=True, session=session)
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.commit()
+
# do not schedule
do_schedule()
self.assertTrue(executor.has_task(ti))
ti.refresh_from_db()
- self.assertEqual(ti.state, State.UP_FOR_RETRY)
+ self.assertEqual(ti.state, State.SCHEDULED)
# now the executor has cleared and it should be allowed the re-queue
executor.queued_tasks.clear()