You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/27 19:17:31 UTC

incubator-airflow git commit: [AIRFLOW-1142] Do not reset orphaned state for backfills

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c2472ffa1 -> 4e79b830e


[AIRFLOW-1142] Do not reset orphaned state for backfills

The scheduler could interfere with backfills when
it resets the state
of tasks that were considered orphaned. This patch
prevents the scheduler
from doing so and adds a guard in the backfill.

Closes #2260 from bolkedebruin/AIRFLOW-1142


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4e79b830
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4e79b830
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4e79b830

Branch: refs/heads/master
Commit: 4e79b830e3261b9d54fdbc7c9dcb510d36565986
Parents: c2472ff
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Apr 27 21:17:25 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Apr 27 21:17:25 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py   | 10 +++++++++-
 airflow/models.py | 10 +++++++++-
 tests/jobs.py     | 42 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 02449c5..e9b6094 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1358,7 +1358,8 @@ class SchedulerJob(BaseJob):
         active_runs = DagRun.find(
             state=State.RUNNING,
             external_trigger=False,
-            session=session
+            session=session,
+            no_backfills=True,
         )
         for dr in active_runs:
             self.logger.info("Resetting {} {}".format(dr.dag_id,
@@ -1855,6 +1856,13 @@ class BackfillJob(BaseJob):
                     self.logger.debug("Task instance to run {} state {}"
                                       .format(ti, ti.state))
 
+                    # guard against externally modified tasks instances or
+                    # in case max concurrency has been reached at task runtime
+                    if ti.state == State.NONE:
+                        self.logger.warning("FIXME: task instance {} state was set to "
+                                            "None externally. This should not happen")
+                        ti.set_state(State.SCHEDULED, session=session)
+
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d2e41cf..51beab8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4037,7 +4037,8 @@ class DagRun(Base):
     @staticmethod
     @provide_session
     def find(dag_id=None, run_id=None, execution_date=None,
-             state=None, external_trigger=None, session=None):
+             state=None, external_trigger=None, no_backfills=False,
+             session=None):
         """
         Returns a set of dag runs for the given search criteria.
         :param dag_id: the dag_id to find dag runs for
@@ -4050,6 +4051,9 @@ class DagRun(Base):
         :type state: State
         :param external_trigger: whether this dag run is externally triggered
         :type external_trigger: bool
+        :param no_backfills: return no backfills (True), return all (False). 
+        Defaults to False
+        :type no_backfills: bool
         :param session: database session
         :type session: Session
         """
@@ -4069,6 +4073,10 @@ class DagRun(Base):
             qry = qry.filter(DR.state == state)
         if external_trigger is not None:
             qry = qry.filter(DR.external_trigger == external_trigger)
+        if no_backfills:
+            # in order to prevent a circular dependency
+            from airflow.jobs import BackfillJob
+            qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))
 
         dr = qry.order_by(DR.execution_date).all()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 26a105b..9ebea15 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -612,6 +612,48 @@ class SchedulerJobTest(unittest.TestCase):
 
         session.close()
 
+    def test_execute_helper_reset_orphaned_tasks(self):
+        session = settings.Session()
+        dag = DAG(
+            'test_execute_helper_reset_orphaned_tasks',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        dr2 = dag.create_dagrun(run_id=BackfillJob.ID_PREFIX,
+                                state=State.RUNNING,
+                                execution_date=DEFAULT_DATE + datetime.timedelta(1),
+                                start_date=DEFAULT_DATE,
+                                session=session)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.SCHEDULED
+        ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+        ti2.state = State.SCHEDULED
+        session.commit()
+
+        processor = mock.MagicMock()
+        processor.get_last_finish_time.return_value = None
+
+        scheduler = SchedulerJob(num_runs=0, run_duration=0)
+        executor = TestExecutor()
+        scheduler.executor = executor
+
+        scheduler._execute_helper(processor_manager=processor)
+
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        self.assertEqual(ti.state, State.NONE)
+
+        ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
+        self.assertEqual(ti2.state, State.SCHEDULED)
+
     @provide_session
     def evaluate_dagrun(
             self,