You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/03 13:28:33 UTC

[1/3] incubator-airflow git commit: [AIRFLOW-695] Retries do not execute because dagrun is in FAILED state

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e9fe64af0 -> f939d7832


[AIRFLOW-695] Retries do not execute because dagrun is in FAILED state

The scheduler checks the tasks instances without taking into account
if the executor already reported back. In this case the executor
reports back several iterations later, but the task is queued nevertheless.
Due to the fact tasks will not enter the queue when the task is considered
running, the task state will be "queued\u201d indefinitely and in limbo
between the scheduler and the executor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c8a4eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c8a4eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c8a4eb34

Branch: refs/heads/master
Commit: c8a4eb34550749464759c6789c8d735d82dfdeb7
Parents: 937142d
Author: root <bo...@xs4all.nl>
Authored: Sun Dec 18 20:19:58 2016 +0000
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jan 2 14:18:22 2017 +0100

----------------------------------------------------------------------
 airflow/executors/base_executor.py |  9 ++++
 airflow/jobs.py                    |  5 +++
 tests/jobs.py                      | 76 +++++++++++++++++++++++++++++++++
 3 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index d702ff2..7a4065e 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -76,6 +76,15 @@ class BaseExecutor(LoggingMixin):
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def has_task(self, task_instance):
+        """
+        Checks if a task is either queued or running in this executor
+        :param task_instance: TaskInstance
+        :return: True if the task is known to this executor
+        """
+        if task_instance.key in self.queued_tasks or task_instance.key in self.running:
+            return True
+
     def sync(self):
         """
         Sync will get called periodically by the heartbeat method.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 81c77a8..819d107 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -989,6 +989,11 @@ class SchedulerJob(BaseJob):
                     # Can't schedule any more since there are no more open slots.
                     break
 
+                if self.executor.has_task(task_instance):
+                    self.logger.debug("Not handling task {} as the executor reports it is running"
+                                      .format(task_instance.key))
+                    continue
+ 
                 if simple_dag_bag.get_dag(task_instance.dag_id).is_paused:
                     self.logger.info("Not executing queued {} since {} is paused"
                                      .format(task_instance, task_instance.dag_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c8a4eb34/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 62e88e5..d7dfbe7 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -21,6 +21,7 @@ import datetime
 import logging
 import os
 import unittest
+import six
 
 from airflow import AirflowException, settings
 from airflow import models
@@ -29,6 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
@@ -899,6 +901,80 @@ class SchedulerJobTest(unittest.TestCase):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    def test_retry_still_in_executor(self):
+        """
+        Checks if the scheduler does not put a task in limbo, when a task is retried
+        but is still present in the executor.
+        """
+        executor = TestExecutor()
+        dagbag = DagBag(executor=executor)
+        dagbag.dags.clear()
+        dagbag.executor = executor
+
+        dag = DAG(
+            dag_id='test_retry_still_in_executor',
+            start_date=DEFAULT_DATE)
+        dag_task1 = BashOperator(
+            task_id='test_retry_handling_op',
+            bash_command='exit 1',
+            retries=1,
+            dag=dag,
+            owner='airflow')
+
+        dag.clear()
+        dag.is_subdag = False
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
+
+        @mock.patch('airflow.models.DagBag', return_value=dagbag)
+        @mock.patch('airflow.models.DagBag.collect_dags')
+        def do_schedule(function, function2):
+            # Use a empty file since the above mock will return the
+            # expected DAGs. Also specify only a single file so that it doesn't
+            # try to schedule the above DAG repeatedly.
+            scheduler = SchedulerJob(num_runs=1,
+                                     executor=executor,
+                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                                         "no_dags.py"))
+            scheduler.heartrate = 0
+            scheduler.run()
+
+        do_schedule()
+        self.assertEquals(1, len(executor.queued_tasks))
+
+        def run_with_error(task):
+            try:
+                task.run()
+            except AirflowException:
+                pass
+
+        ti_tuple = six.next(six.itervalues(executor.queued_tasks))
+        (command, priority, queue, ti) = ti_tuple
+        ti.task = dag_task1
+
+        # fail execution
+        run_with_error(ti)
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+        self.assertEqual(ti.try_number, 1)
+
+        # do not schedule
+        do_schedule()
+        self.assertTrue(executor.has_task(ti))
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+
+        # now the executor has cleared and it should be allowed the re-queue
+        executor.queued_tasks.clear()
+        do_schedule()
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.QUEUED)
+
     def test_scheduler_run_duration(self):
         """
         Verifies that the scheduler run duration limit is followed.


[2/3] incubator-airflow git commit: [AIRFLOW-727] try_number is not increased

Posted by bo...@apache.org.
[AIRFLOW-727] try_number is not increased

A dag that has retries enabled will retry indefinitely
as try_number gets reset to 0 in LocalTaskJob as
task_instance is not fully populated, but nevertheless
saved to the databases.

This was caused by a commit in
https://github.com/apache/incubator-airflow/pull/1939


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6948e40c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6948e40c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6948e40c

Branch: refs/heads/master
Commit: 6948e40cb82295ef30c7bd05b216d2201523f9e2
Parents: c8a4eb3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Jan 2 21:55:01 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 3 10:28:33 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                    |  1 +
 tests/dags/test_retry_handling_job.py | 36 ++++++++++++++++++++++++++++++
 tests/jobs.py                         | 23 +++++++++++++++++++
 3 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index eab4a30..fc5a242 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -355,6 +355,7 @@ def run(args, dag=None):
     task = dag.get_task(task_id=args.task_id)
 
     ti = TaskInstance(task, args.execution_date)
+    ti.refresh_from_db()
 
     if args.local:
         print("Logging into: " + filename)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/dags/test_retry_handling_job.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_retry_handling_job.py b/tests/dags/test_retry_handling_job.py
new file mode 100644
index 0000000..111dfd4
--- /dev/null
+++ b/tests/dags/test_retry_handling_job.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import DAG
+from airflow.operators.bash_operator import BashOperator
+from datetime import datetime, timedelta
+
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': datetime(2016,10,5,19),
+    'email': ['airflow@airflow.com'],
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'retries': 4,
+    'retry_delay': timedelta(seconds=0),
+}
+
+dag = DAG('test_retry_handling_job', default_args=default_args, schedule_interval='@once')
+
+task1 = BashOperator(
+    task_id='test_retry_handling_op',
+    bash_command='exit 1',
+    dag=dag)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d7dfbe7..32c615d 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -975,6 +975,29 @@ class SchedulerJobTest(unittest.TestCase):
         ti.refresh_from_db()
         self.assertEqual(ti.state, State.QUEUED)
 
+    @unittest.skipUnless("INTEGRATION" in os.environ, "Can only run end to end")
+    def test_retry_handling_job(self):
+        """
+        Integration test of the scheduler not accidentally resetting
+        the try_numbers for a task
+        """
+        dag = self.dagbag.get_dag('test_retry_handling_job')
+        dag_task1 = dag.get_task("test_retry_handling_op")
+        dag.clear()
+
+        scheduler = SchedulerJob(dag_id=dag.dag_id,
+                                 num_runs=1)
+        scheduler.heartrate = 0
+        scheduler.run()
+
+        session = settings.Session()
+        ti = session.query(TI).filter(TI.dag_id==dag.dag_id,
+                                      TI.task_id==dag_task1.task_id).first()
+
+        # make sure the counter has increased
+        self.assertEqual(ti.try_number, 2)
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+
     def test_scheduler_run_duration(self):
         """
         Verifies that the scheduler run duration limit is followed.


[3/3] incubator-airflow git commit: Merge branch 'master' into temp_fix

Posted by bo...@apache.org.
Merge branch 'master' into temp_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f939d783
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f939d783
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f939d783

Branch: refs/heads/master
Commit: f939d78328c926d8e2b29dbe0a3e39ca394b3da0
Parents: 6948e40 e9fe64a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 3 14:27:12 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 3 14:27:12 2017 +0100

----------------------------------------------------------------------
 scripts/perf/dags/perf_dag_1.py       |  45 +++++++
 scripts/perf/dags/perf_dag_2.py       |  45 +++++++
 scripts/perf/scheduler_ops_metrics.py | 187 +++++++++++++++++++++++++++++
 3 files changed, 277 insertions(+)
----------------------------------------------------------------------