You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:38 UTC

[40/45] incubator-airflow git commit: [AIRFLOW-900] Double trigger should not kill original task instance

[AIRFLOW-900] Double trigger should not kill original task instance

This update the tests of an earlier AIRFLOW-900.

Closes #2146 from bolkedebruin/AIRFLOW-900


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b26a5d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b26a5d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b26a5d9

Branch: refs/heads/v1-8-stable
Commit: 2b26a5d95ce230b66255c8e7e7388c8013dc6ba6
Parents: 57faa53
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 13:42:58 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:36:07 2017 -0700

----------------------------------------------------------------------
 tests/core.py                     | 58 -----------------------
 tests/dags/sleep_forever_dag.py   | 29 ------------
 tests/dags/test_double_trigger.py | 29 ++++++++++++
 tests/jobs.py                     | 86 ++++++++++++++++++++++++++++++++--
 4 files changed, 112 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 636ad43..870a0cb 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -896,64 +896,6 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
-    def test_run_task_twice(self):
-        """If two copies of a TI run, the new one should die, and old should live"""
-        dagbag = models.DagBag(
-            dag_folder=TEST_DAG_FOLDER,
-            include_examples=False,
-        )
-        TI = models.TaskInstance
-        dag = dagbag.dags.get('sleep_forever_dag')
-        task = dag.task_dict.get('sleeps_forever')
-    
-        ti = TI(task=task, execution_date=DEFAULT_DATE)
-        job1 = jobs.LocalTaskJob(
-            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-        job2 = jobs.LocalTaskJob(
-            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-
-        p1 = multiprocessing.Process(target=job1.run)
-        p2 = multiprocessing.Process(target=job2.run)
-        try:
-            p1.start()
-            start_time = timetime()
-            sleep(5.0) # must wait for session to be created on p1
-            settings.engine.dispose()
-            session = settings.Session()
-            ti.refresh_from_db(session=session)
-            self.assertEqual(State.RUNNING, ti.state)
-            p1pid = ti.pid
-            settings.engine.dispose()
-            p2.start()
-            p2.join(5) # wait 5 seconds until termination
-            self.assertFalse(p2.is_alive())
-            self.assertTrue(p1.is_alive())
-
-            settings.engine.dispose()
-            session = settings.Session()
-            ti.refresh_from_db(session=session)
-            self.assertEqual(State.RUNNING, ti.state)
-            self.assertEqual(p1pid, ti.pid)
-
-            # check changing hostname kills task
-            ti.refresh_from_db(session=session, lock_for_update=True)
-            ti.hostname = 'nonexistenthostname'
-            session.merge(ti)
-            session.commit()
-
-            p1.join(5)
-            self.assertFalse(p1.is_alive())
-        finally:
-            try:
-                p1.terminate()
-            except AttributeError:
-                pass # process already terminated
-            try:
-                p2.terminate()
-            except AttributeError:
-                pass # process already terminated
-            session.close()
-
     def test_terminate_task(self):
         """If a task instance's db state get deleted, it should fail"""
         TI = models.TaskInstance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
deleted file mode 100644
index b1f810e..0000000
--- a/tests/dags/sleep_forever_dag.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Used for unit tests"""
-import airflow
-from airflow.operators.bash_operator import BashOperator
-from airflow.models import DAG
-
-dag = DAG(
-    dag_id='sleep_forever_dag',
-    schedule_interval=None,
-)
-
-task = BashOperator(
-    task_id='sleeps_forever',
-    dag=dag,
-    bash_command="sleep 10000000000",
-    start_date=airflow.utils.dates.days_ago(2),
-    owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/test_double_trigger.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_double_trigger.py b/tests/dags/test_double_trigger.py
new file mode 100644
index 0000000..b58f5c9
--- /dev/null
+++ b/tests/dags/test_double_trigger.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+
+args = {
+    'owner': 'airflow',
+    'start_date': DEFAULT_DATE,
+}
+
+dag = DAG(dag_id='test_localtaskjob_double_trigger', default_args=args)
+task = DummyOperator(
+    task_id='test_localtaskjob_double_trigger_task',
+    dag=dag)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d208fd4..aee0e9c 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,12 +23,13 @@ import os
 import shutil
 import unittest
 import six
-import sys
+import socket
 from tempfile import mkdtemp
 
 from airflow import AirflowException, settings, models
 from airflow.bin import cli
-from airflow.jobs import BackfillJob, SchedulerJob
+from airflow.executors import SequentialExecutor
+from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
@@ -36,8 +37,12 @@ from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.dag_processing import SimpleDagBag
+
 from mock import patch
-from tests.executor.test_executor import TestExecutor
+from sqlalchemy.orm.session import make_transient
+from tests.executors.test_executor import TestExecutor
+
+from tests.core import TEST_DAG_FOLDER
 
 from airflow import configuration
 configuration.load_test_config()
@@ -344,6 +349,81 @@ class BackfillJobTest(unittest.TestCase):
                 self.assertEqual(State.NONE, ti.state)
 
 
+class LocalTaskJobTest(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    @patch.object(LocalTaskJob, "_is_descendant_process")
+    def test_localtaskjob_heartbeat(self, is_descendant):
+        session = settings.Session()
+        dag = DAG(
+            'test_localtaskjob_heartbeat',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        ti.hostname = "blablabla"
+        session.commit()
+
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+        is_descendant.return_value = True
+        ti.state = State.RUNNING
+        ti.hostname = socket.getfqdn()
+        ti.pid = 1
+        session.merge(ti)
+        session.commit()
+
+        ret = job1.heartbeat_callback()
+        self.assertEqual(ret, None)
+
+        is_descendant.return_value = False
+        self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+    def test_localtaskjob_double_trigger(self):
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        dag = dagbag.dags.get('test_localtaskjob_double_trigger')
+        task = dag.get_task('test_localtaskjob_double_trigger_task')
+
+        session = settings.Session()
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = dr.get_task_instance(task_id=task.task_id, session=session)
+        ti.state = State.RUNNING
+        ti.hostname = socket.getfqdn()
+        ti.pid = 1
+        session.commit()
+
+        ti_run = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor())
+        self.assertRaises(AirflowException, job1.run)
+
+        ti = dr.get_task_instance(task_id=task.task_id, session=session)
+        self.assertEqual(ti.pid, 1)
+        self.assertEqual(ti.state, State.RUNNING)
+
+        session.close()
+
+
 class SchedulerJobTest(unittest.TestCase):
     # These defaults make the test faster to run
     default_scheduler_args = {"file_process_interval": 0,