You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2016/06/17 21:31:08 UTC

incubator-airflow git commit: [AIRFLOW-234] make task that aren't `running` self-terminate

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d243c003b -> 7c0f8373f


[AIRFLOW-234] make task that aren't `running` self-terminate

Closes #1585 from mistercrunch/undeads


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c0f8373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c0f8373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c0f8373

Branch: refs/heads/master
Commit: 7c0f8373f59b0554d5ba15bb0e5e8669f0830313
Parents: d243c00
Author: Maxime Beauchemin <ma...@apache.org>
Authored: Fri Jun 17 14:30:55 2016 -0700
Committer: Maxime Beauchemin <ma...@gmail.com>
Committed: Fri Jun 17 14:30:55 2016 -0700

----------------------------------------------------------------------
 airflow/example_dags/docker_copy_data.py        | 13 ++++++
 airflow/example_dags/example_bash_operator.py   | 13 ++++++
 airflow/example_dags/example_branch_operator.py | 13 ++++++
 airflow/example_dags/example_docker_operator.py | 13 ++++++
 airflow/example_dags/example_http_operator.py   | 13 ++++++
 airflow/example_dags/example_python_operator.py | 13 ++++++
 .../example_short_circuit_operator.py           | 13 ++++++
 airflow/example_dags/example_subdag_operator.py | 13 ++++++
 .../example_trigger_controller_dag.py           | 14 +++++-
 .../example_dags/example_trigger_target_dag.py  | 13 ++++++
 airflow/example_dags/example_xcom.py            | 13 ++++++
 airflow/example_dags/test_utils.py              | 29 ++++++++++++
 airflow/jobs.py                                 | 46 ++++++++++++--------
 airflow/models.py                               | 27 +++++++++---
 tests/core.py                                   | 42 +++++++++++++++++-
 15 files changed, 263 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/docker_copy_data.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py
index ccf84c1..f0789b1 100644
--- a/airflow/example_dags/docker_copy_data.py
+++ b/airflow/example_dags/docker_copy_data.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 '''
 This sample "listen to directory". move the new file and print it, using docker-containers.
 The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 4ab9144..c759f4d 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from builtins import range
 from airflow.operators import BashOperator, DummyOperator
 from airflow.models import DAG

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index f576d20..edd177a 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from airflow.operators import BranchPythonOperator, DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_docker_operator.py b/airflow/example_dags/example_docker_operator.py
index e014fe5..6bb71ff 100644
--- a/airflow/example_dags/example_docker_operator.py
+++ b/airflow/example_dags/example_docker_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 """
 from airflow import DAG
 from airflow.operators import BashOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 4501825..41ea385 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 """
 ### Example HTTP operator and sensor
 """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 0fc2180..a2f8abd 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from __future__ import print_function
 from builtins import range
 from airflow.operators import PythonOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 967c65e..907cf51 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from airflow.operators import ShortCircuitOperator, DummyOperator
 from airflow.models import DAG
 import airflow.utils.helpers

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index 120f333..57a62c6 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from datetime import datetime
 
 from airflow.models import DAG

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_controller_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 4b66ad1..b754d64 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -1,4 +1,16 @@
-
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 """This example illustrates the use of the TriggerDagRunOperator. There are 2
 entities at work in this scenario:
 1. The Controller DAG - the DAG that conditionally executes the trigger

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_target_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 172003f..41a3e36 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from airflow.operators import BashOperator, PythonOperator
 from airflow.models import DAG
 from datetime import datetime

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 2d9c087..71cd44e 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from __future__ import print_function
 import airflow
 from datetime import datetime, timedelta

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
new file mode 100644
index 0000000..38e50d0
--- /dev/null
+++ b/airflow/example_dags/test_utils.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+from airflow.operators import BashOperator
+from airflow.models import DAG
+from datetime import datetime
+
+dag = DAG(
+    dag_id='test_utils',
+    schedule_interval=None,
+)
+
+task = BashOperator(
+    task_id='sleeps_forever',
+    dag=dag,
+    bash_command="sleep 10000000000",
+    start_date=datetime(2016, 1, 1),
+    owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 1e583ac..0713bbe 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -19,7 +19,7 @@ from __future__ import unicode_literals
 
 from past.builtins import basestring
 from collections import defaultdict, Counter
-from datetime import datetime
+from datetime import datetime, timedelta
 import getpass
 import logging
 import socket
@@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin):
         '''
         pass
 
-    def heartbeat_callback(self):
+    def heartbeat_callback(self, session=None):
         pass
 
     def heartbeat(self):
@@ -139,7 +139,7 @@ class BaseJob(Base, LoggingMixin):
         sleep at all.
         '''
         session = settings.Session()
-        job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
+        job = session.query(BaseJob).filter_by(id=self.id).one()
 
         if job.state == State.SHUTDOWN:
             self.kill()
@@ -154,9 +154,9 @@ class BaseJob(Base, LoggingMixin):
 
         session.merge(job)
         session.commit()
-        session.close()
 
-        self.heartbeat_callback()
+        self.heartbeat_callback(session=session)
+        session.close()
         self.logger.debug('[heart] Boom.')
 
     def run(self):
@@ -378,7 +378,8 @@ class SchedulerJob(BaseJob):
                 filename=filename, stacktrace=stacktrace))
         session.commit()
 
-    def schedule_dag(self, dag):
+    @provide_session
+    def schedule_dag(self, dag, session=None):
         """
         This method checks whether a new DagRun needs to be created
         for a DAG based on scheduling interval
@@ -386,7 +387,6 @@ class SchedulerJob(BaseJob):
         """
         if dag.schedule_interval:
             DagRun = models.DagRun
-            session = settings.Session()
             active_runs = DagRun.find(
                 dag_id=dag.dag_id,
                 state=State.RUNNING,
@@ -799,10 +799,10 @@ class SchedulerJob(BaseJob):
             finally:
                 settings.Session.remove()
         executor.end()
-
         session.close()
 
-    def heartbeat_callback(self):
+    @provide_session
+    def heartbeat_callback(self, session=None):
         Stats.gauge('scheduler_heartbeat', 1, 1)
 
 
@@ -1093,6 +1093,15 @@ class LocalTaskJob(BaseJob):
         self.pool = pool
         self.pickle_id = pickle_id
         self.mark_success = mark_success
+
+        # terminating state is used so that a job don't try to
+        # terminate multiple times
+        self.terminating = False
+
+        # Keeps track of the fact that the task instance has been observed
+        # as running at least once
+        self.was_running = False
+
         super(LocalTaskJob, self).__init__(*args, **kwargs)
 
     def _execute(self):
@@ -1115,23 +1124,26 @@ class LocalTaskJob(BaseJob):
     def on_kill(self):
         self.process.terminate()
 
-    """
-    def heartbeat_callback(self):
-        if datetime.now() - self.start_date < timedelta(seconds=300):
+    @provide_session
+    def heartbeat_callback(self, session=None):
+        """Self destruct task if state has been moved away from running externally"""
+
+        if self.terminating:
+            # task is already terminating, let it breathe
             return
+
         # Suicide pill
         TI = models.TaskInstance
         ti = self.task_instance
-        session = settings.Session()
         state = session.query(TI.state).filter(
             TI.dag_id==ti.dag_id, TI.task_id==ti.task_id,
             TI.execution_date==ti.execution_date).scalar()
-        session.commit()
-        session.close()
-        if state != State.RUNNING:
+        if state == State.RUNNING:
+            self.was_running = True
+        elif self.was_running and hasattr(self, 'process'):
             logging.warning(
                 "State of this instance has been externally set to "
                 "{self.task_instance.state}. "
                 "Taking the poison pill. So long.".format(**locals()))
             self.process.terminate()
-    """
+            self.terminating = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 38359f7..09d880e 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1179,17 +1179,32 @@ class TaskInstance(Base):
     def run(
             self,
             verbose=True,
-            ignore_dependencies=False,  # Doesn't check for deps, just runs
-            ignore_depends_on_past=False,   # Ignore depends_on_past but respect
-                                            # other deps
-            force=False,  # Disregards previous successes
-            mark_success=False,  # Don't run the task, act as if it succeeded
-            test_mode=False,  # Doesn't record success or failure in the DB
+            ignore_dependencies=False,
+            ignore_depends_on_past=False,
+            force=False,
+            mark_success=False,
+            test_mode=False,
             job_id=None,
             pool=None,
             session=None):
         """
         Runs the task instance.
+
+        :param verbose: whether to turn on more verbose loggin
+        :type verbose: boolean
+        :param ignore_dependencies: Doesn't check for deps, just runs
+        :type ignore_dependencies: boolean
+        :param ignore_depends_on_past: Ignore depends_on_past but respect
+            other dependencies
+        :type ignore_depends_on_past: boolean
+        :param force: Forces a run regarless of previous success
+        :type force: boolean
+        :param mark_success: Don't run the task, mark its state as success
+        :type mark_success: boolean
+        :param test_mode: Doesn't record success or failure in the DB
+        :type test_mode: boolean
+        :param pool: specifies the pool to use to run the task instance
+        :type pool: str
         """
         task = self.task
         self.pool = pool or task.pool

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 5e6a4fd..2ab14ea 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -19,6 +19,7 @@ import json
 import os
 import re
 import unittest
+import multiprocessing
 import mock
 import tempfile
 from datetime import datetime, time, timedelta
@@ -49,7 +50,7 @@ from airflow.configuration import AirflowConfigException
 
 import six
 
-NUM_EXAMPLE_DAGS = 15
+NUM_EXAMPLE_DAGS = 16
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
@@ -629,6 +630,45 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
+    def test_terminate_task(self):
+        """If a task instance's db state get deleted, it should fail"""
+        TI = models.TaskInstance
+        dag = self.dagbag.dags.get('test_utils')
+        task = dag.task_dict.get('sleeps_forever')
+
+        ti = TI(task=task, execution_date=DEFAULT_DATE)
+        job = jobs.LocalTaskJob(
+            task_instance=ti, force=True, executor=SequentialExecutor())
+
+        # Running task instance asynchronously
+        p = multiprocessing.Process(target=job.run)
+        p.start()
+        sleep(5)
+        settings.engine.dispose()
+        session = settings.Session()
+        ti.refresh_from_db(session=session)
+        # making sure it's actually running
+        assert State.RUNNING == ti.state
+        ti = (
+            session.query(TI)
+            .filter_by(
+                dag_id=task.dag_id,
+                task_id=task.task_id,
+                execution_date=DEFAULT_DATE)
+            .one()
+        )
+        # deleting the instance should result in a failure
+        session.delete(ti)
+        session.commit()
+        # waiting for the async task to finish
+        p.join()
+
+        # making sure that the task ended up as failed
+        ti.refresh_from_db(session=session)
+        assert State.FAILED == ti.state
+        session.close()
+
+
 class CliTests(unittest.TestCase):
     def setUp(self):
         configuration.test_mode()