You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:28 UTC
[24/28] incubator-airflow git commit: [AIRFLOW-900] Double trigger
should not kill original task instance
[AIRFLOW-900] Double trigger should not kill original task instance
This update the tests of an earlier AIRFLOW-900.
Closes #2146 from bolkedebruin/AIRFLOW-900
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b26a5d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b26a5d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b26a5d9
Branch: refs/heads/v1-8-test
Commit: 2b26a5d95ce230b66255c8e7e7388c8013dc6ba6
Parents: 57faa53
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 13:42:58 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:36:07 2017 -0700
----------------------------------------------------------------------
tests/core.py | 58 -----------------------
tests/dags/sleep_forever_dag.py | 29 ------------
tests/dags/test_double_trigger.py | 29 ++++++++++++
tests/jobs.py | 86 ++++++++++++++++++++++++++++++++--
4 files changed, 112 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 636ad43..870a0cb 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -896,64 +896,6 @@ class CoreTest(unittest.TestCase):
trigger_rule="non_existant",
dag=self.dag)
- def test_run_task_twice(self):
- """If two copies of a TI run, the new one should die, and old should live"""
- dagbag = models.DagBag(
- dag_folder=TEST_DAG_FOLDER,
- include_examples=False,
- )
- TI = models.TaskInstance
- dag = dagbag.dags.get('sleep_forever_dag')
- task = dag.task_dict.get('sleeps_forever')
-
- ti = TI(task=task, execution_date=DEFAULT_DATE)
- job1 = jobs.LocalTaskJob(
- task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
- job2 = jobs.LocalTaskJob(
- task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-
- p1 = multiprocessing.Process(target=job1.run)
- p2 = multiprocessing.Process(target=job2.run)
- try:
- p1.start()
- start_time = timetime()
- sleep(5.0) # must wait for session to be created on p1
- settings.engine.dispose()
- session = settings.Session()
- ti.refresh_from_db(session=session)
- self.assertEqual(State.RUNNING, ti.state)
- p1pid = ti.pid
- settings.engine.dispose()
- p2.start()
- p2.join(5) # wait 5 seconds until termination
- self.assertFalse(p2.is_alive())
- self.assertTrue(p1.is_alive())
-
- settings.engine.dispose()
- session = settings.Session()
- ti.refresh_from_db(session=session)
- self.assertEqual(State.RUNNING, ti.state)
- self.assertEqual(p1pid, ti.pid)
-
- # check changing hostname kills task
- ti.refresh_from_db(session=session, lock_for_update=True)
- ti.hostname = 'nonexistenthostname'
- session.merge(ti)
- session.commit()
-
- p1.join(5)
- self.assertFalse(p1.is_alive())
- finally:
- try:
- p1.terminate()
- except AttributeError:
- pass # process already terminated
- try:
- p2.terminate()
- except AttributeError:
- pass # process already terminated
- session.close()
-
def test_terminate_task(self):
"""If a task instance's db state get deleted, it should fail"""
TI = models.TaskInstance
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
deleted file mode 100644
index b1f810e..0000000
--- a/tests/dags/sleep_forever_dag.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Used for unit tests"""
-import airflow
-from airflow.operators.bash_operator import BashOperator
-from airflow.models import DAG
-
-dag = DAG(
- dag_id='sleep_forever_dag',
- schedule_interval=None,
-)
-
-task = BashOperator(
- task_id='sleeps_forever',
- dag=dag,
- bash_command="sleep 10000000000",
- start_date=airflow.utils.dates.days_ago(2),
- owner='airflow')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/test_double_trigger.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_double_trigger.py b/tests/dags/test_double_trigger.py
new file mode 100644
index 0000000..b58f5c9
--- /dev/null
+++ b/tests/dags/test_double_trigger.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+
+args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+}
+
+dag = DAG(dag_id='test_localtaskjob_double_trigger', default_args=args)
+task = DummyOperator(
+ task_id='test_localtaskjob_double_trigger_task',
+ dag=dag)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d208fd4..aee0e9c 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,12 +23,13 @@ import os
import shutil
import unittest
import six
-import sys
+import socket
from tempfile import mkdtemp
from airflow import AirflowException, settings, models
from airflow.bin import cli
-from airflow.jobs import BackfillJob, SchedulerJob
+from airflow.executors import SequentialExecutor
+from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob
from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
@@ -36,8 +37,12 @@ from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.dag_processing import SimpleDagBag
+
from mock import patch
-from tests.executor.test_executor import TestExecutor
+from sqlalchemy.orm.session import make_transient
+from tests.executors.test_executor import TestExecutor
+
+from tests.core import TEST_DAG_FOLDER
from airflow import configuration
configuration.load_test_config()
@@ -344,6 +349,81 @@ class BackfillJobTest(unittest.TestCase):
self.assertEqual(State.NONE, ti.state)
+class LocalTaskJobTest(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ @patch.object(LocalTaskJob, "_is_descendant_process")
+ def test_localtaskjob_heartbeat(self, is_descendant):
+ session = settings.Session()
+ dag = DAG(
+ 'test_localtaskjob_heartbeat',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = DummyOperator(task_id='op1')
+
+ dag.clear()
+ dr = dag.create_dagrun(run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session)
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.RUNNING
+ ti.hostname = "blablabla"
+ session.commit()
+
+ job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+ is_descendant.return_value = True
+ ti.state = State.RUNNING
+ ti.hostname = socket.getfqdn()
+ ti.pid = 1
+ session.merge(ti)
+ session.commit()
+
+ ret = job1.heartbeat_callback()
+ self.assertEqual(ret, None)
+
+ is_descendant.return_value = False
+ self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+ def test_localtaskjob_double_trigger(self):
+ dagbag = models.DagBag(
+ dag_folder=TEST_DAG_FOLDER,
+ include_examples=False,
+ )
+ dag = dagbag.dags.get('test_localtaskjob_double_trigger')
+ task = dag.get_task('test_localtaskjob_double_trigger_task')
+
+ session = settings.Session()
+
+ dag.clear()
+ dr = dag.create_dagrun(run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session)
+ ti = dr.get_task_instance(task_id=task.task_id, session=session)
+ ti.state = State.RUNNING
+ ti.hostname = socket.getfqdn()
+ ti.pid = 1
+ session.commit()
+
+ ti_run = TI(task=task, execution_date=DEFAULT_DATE)
+ job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor())
+ self.assertRaises(AirflowException, job1.run)
+
+ ti = dr.get_task_instance(task_id=task.task_id, session=session)
+ self.assertEqual(ti.pid, 1)
+ self.assertEqual(ti.state, State.RUNNING)
+
+ session.close()
+
+
class SchedulerJobTest(unittest.TestCase):
# These defaults make the test faster to run
default_scheduler_args = {"file_process_interval": 0,