You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/03 13:22:52 UTC

[1/2] incubator-airflow git commit: [AIRFLOW-695] Retries do not execute because dagrun is in FAILED state

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7fa86f72c -> e9fe64af0


[AIRFLOW-695] Retries do not execute because dagrun is in FAILED state

The scheduler checks the tasks instances without taking into account
if the executor already reported back. In this case the executor
reports back several iterations later, but the task is queued nevertheless.
Due to the fact tasks will not enter the queue when the task is considered
running, the task state will be "queued\u201d indefinitely and in limbo
between the scheduler and the executor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2e166b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2e166b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2e166b79

Branch: refs/heads/master
Commit: 2e166b7928c5f66735c687a830f82ff9e1a733b6
Parents: 937142d
Author: root <bo...@xs4all.nl>
Authored: Sun Dec 18 20:19:58 2016 +0000
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 3 14:21:43 2017 +0100

----------------------------------------------------------------------
 airflow/executors/base_executor.py |  9 ++++
 airflow/jobs.py                    |  5 +++
 tests/jobs.py                      | 76 +++++++++++++++++++++++++++++++++
 3 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index d702ff2..7a4065e 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -76,6 +76,15 @@ class BaseExecutor(LoggingMixin):
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def has_task(self, task_instance):
+        """
+        Checks if a task is either queued or running in this executor
+        :param task_instance: TaskInstance
+        :return: True if the task is known to this executor
+        """
+        if task_instance.key in self.queued_tasks or task_instance.key in self.running:
+            return True
+
     def sync(self):
         """
         Sync will get called periodically by the heartbeat method.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 81c77a8..819d107 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -989,6 +989,11 @@ class SchedulerJob(BaseJob):
                     # Can't schedule any more since there are no more open slots.
                     break
 
+                if self.executor.has_task(task_instance):
+                    self.logger.debug("Not handling task {} as the executor reports it is running"
+                                      .format(task_instance.key))
+                    continue
+ 
                 if simple_dag_bag.get_dag(task_instance.dag_id).is_paused:
                     self.logger.info("Not executing queued {} since {} is paused"
                                      .format(task_instance, task_instance.dag_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 62e88e5..d7dfbe7 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -21,6 +21,7 @@ import datetime
 import logging
 import os
 import unittest
+import six
 
 from airflow import AirflowException, settings
 from airflow import models
@@ -29,6 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
@@ -899,6 +901,80 @@ class SchedulerJobTest(unittest.TestCase):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    def test_retry_still_in_executor(self):
+        """
+        Checks if the scheduler does not put a task in limbo, when a task is retried
+        but is still present in the executor.
+        """
+        executor = TestExecutor()
+        dagbag = DagBag(executor=executor)
+        dagbag.dags.clear()
+        dagbag.executor = executor
+
+        dag = DAG(
+            dag_id='test_retry_still_in_executor',
+            start_date=DEFAULT_DATE)
+        dag_task1 = BashOperator(
+            task_id='test_retry_handling_op',
+            bash_command='exit 1',
+            retries=1,
+            dag=dag,
+            owner='airflow')
+
+        dag.clear()
+        dag.is_subdag = False
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
+
+        @mock.patch('airflow.models.DagBag', return_value=dagbag)
+        @mock.patch('airflow.models.DagBag.collect_dags')
+        def do_schedule(function, function2):
+            # Use a empty file since the above mock will return the
+            # expected DAGs. Also specify only a single file so that it doesn't
+            # try to schedule the above DAG repeatedly.
+            scheduler = SchedulerJob(num_runs=1,
+                                     executor=executor,
+                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                                         "no_dags.py"))
+            scheduler.heartrate = 0
+            scheduler.run()
+
+        do_schedule()
+        self.assertEquals(1, len(executor.queued_tasks))
+
+        def run_with_error(task):
+            try:
+                task.run()
+            except AirflowException:
+                pass
+
+        ti_tuple = six.next(six.itervalues(executor.queued_tasks))
+        (command, priority, queue, ti) = ti_tuple
+        ti.task = dag_task1
+
+        # fail execution
+        run_with_error(ti)
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+        self.assertEqual(ti.try_number, 1)
+
+        # do not schedule
+        do_schedule()
+        self.assertTrue(executor.has_task(ti))
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+
+        # now the executor has cleared and it should be allowed the re-queue
+        executor.queued_tasks.clear()
+        do_schedule()
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.QUEUED)
+
     def test_scheduler_run_duration(self):
         """
         Verifies that the scheduler run duration limit is followed.


[2/2] incubator-airflow git commit: Merge remote-tracking branch 'apache/master'

Posted by bo...@apache.org.
Merge remote-tracking branch 'apache/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9fe64af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9fe64af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9fe64af

Branch: refs/heads/master
Commit: e9fe64af0c178030ffd5d97cde3a9b1ab875955f
Parents: 2e166b7 7fa86f7
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 3 14:22:42 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 3 14:22:42 2017 +0100

----------------------------------------------------------------------
 scripts/perf/dags/perf_dag_1.py       |  45 +++++++
 scripts/perf/dags/perf_dag_2.py       |  45 +++++++
 scripts/perf/scheduler_ops_metrics.py | 187 +++++++++++++++++++++++++++++
 3 files changed, 277 insertions(+)
----------------------------------------------------------------------