You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:36 UTC
[38/45] incubator-airflow git commit: [AIRFLOW-900] Fixes bugs in
LocalTaskJob for double run protection
[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection
Right now, a second task instance being triggered
will cause
both itself and the original task to run because
the hostname
and pid fields are updated regardless if the task
is already running.
Also, pid field is not refreshed from db properly.
Also, we should
check against parent's pid.
Will be followed up by working tests.
Closes #2102 from saguziel/aguziel-fix-trigger-2
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1243ab16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1243ab16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1243ab16
Branch: refs/heads/v1-8-stable
Commit: 1243ab16849ab9716b26aeba6a11ea3e9e9a81ca
Parents: a8f2c27
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Mar 11 10:54:39 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:45 2017 -0700
----------------------------------------------------------------------
airflow/jobs.py | 41 ++++++++++++++-----------
airflow/models.py | 2 ++
tests/core.py | 59 ++++++++++++++++++++++++++++++++++++
tests/dags/sleep_forever_dag.py | 29 ++++++++++++++++++
4 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index c61b229..222d9ba 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2072,15 +2072,6 @@ class LocalTaskJob(BaseJob):
try:
self.task_runner.start()
- ti = self.task_instance
- session = settings.Session()
- if self.task_runner.process:
- ti.pid = self.task_runner.process.pid
- ti.hostname = socket.getfqdn()
- session.merge(ti)
- session.commit()
- session.close()
-
last_heartbeat_time = time.time()
heartbeat_time_limit = conf.getint('scheduler',
'scheduler_zombie_task_threshold')
@@ -2120,6 +2111,18 @@ class LocalTaskJob(BaseJob):
self.task_runner.terminate()
self.task_runner.on_finish()
+ def _is_descendant_process(self, pid):
+ """Checks if pid is a descendant of the current process.
+
+ :param pid: process id to check
+ :type pid: int
+ :rtype: bool
+ """
+ try:
+ return psutil.Process(pid) in psutil.Process().children(recursive=True)
+ except psutil.NoSuchProcess:
+ return False
+
@provide_session
def heartbeat_callback(self, session=None):
"""Self destruct task if state has been moved away from running externally"""
@@ -2133,15 +2136,17 @@ class LocalTaskJob(BaseJob):
if ti.state == State.RUNNING:
self.was_running = True
fqdn = socket.getfqdn()
- if not (fqdn == ti.hostname and
- self.task_runner.process.pid == ti.pid):
- logging.warning("Recorded hostname and pid of {ti.hostname} "
- "and {ti.pid} do not match this instance's "
- "which are {fqdn} and "
- "{self.task_runner.process.pid}. "
- "Taking the poison pill. So long."
- .format(**locals()))
- raise AirflowException("Another worker/process is running this job")
+ if fqdn != ti.hostname:
+ logging.warning("The recorded hostname {ti.hostname} "
+ "does not match this instance's hostname "
+ "{fqdn}".format(**locals()))
+ raise AirflowException("Hostname of job runner does not match")
+ elif not self._is_descendant_process(ti.pid):
+ current_pid = os.getpid()
+ logging.warning("Recorded pid {ti.pid} is not a "
+ "descendant of the current pid "
+ "{current_pid}".format(**locals()))
+ raise AirflowException("PID of job runner does not match")
elif (self.was_running
and self.task_runner.return_code() is None
and hasattr(self.task_runner, 'process')):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 32c52ac..7c6590f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -997,6 +997,7 @@ class TaskInstance(Base):
self.end_date = ti.end_date
self.try_number = ti.try_number
self.hostname = ti.hostname
+ self.pid = ti.pid
else:
self.state = None
@@ -1320,6 +1321,7 @@ class TaskInstance(Base):
if not test_mode:
session.add(Log(State.RUNNING, self))
self.state = State.RUNNING
+ self.pid = os.getpid()
self.end_date = None
if not test_mode:
session.merge(self)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ee7a738..636ad43 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -26,6 +26,7 @@ from datetime import datetime, time, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import signal
+from time import time as timetime
from time import sleep
import warnings
@@ -895,6 +896,64 @@ class CoreTest(unittest.TestCase):
trigger_rule="non_existant",
dag=self.dag)
+ def test_run_task_twice(self):
+ """If two copies of a TI run, the new one should die, and old should live"""
+ dagbag = models.DagBag(
+ dag_folder=TEST_DAG_FOLDER,
+ include_examples=False,
+ )
+ TI = models.TaskInstance
+ dag = dagbag.dags.get('sleep_forever_dag')
+ task = dag.task_dict.get('sleeps_forever')
+
+ ti = TI(task=task, execution_date=DEFAULT_DATE)
+ job1 = jobs.LocalTaskJob(
+ task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ job2 = jobs.LocalTaskJob(
+ task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+
+ p1 = multiprocessing.Process(target=job1.run)
+ p2 = multiprocessing.Process(target=job2.run)
+ try:
+ p1.start()
+ start_time = timetime()
+ sleep(5.0) # must wait for session to be created on p1
+ settings.engine.dispose()
+ session = settings.Session()
+ ti.refresh_from_db(session=session)
+ self.assertEqual(State.RUNNING, ti.state)
+ p1pid = ti.pid
+ settings.engine.dispose()
+ p2.start()
+ p2.join(5) # wait 5 seconds until termination
+ self.assertFalse(p2.is_alive())
+ self.assertTrue(p1.is_alive())
+
+ settings.engine.dispose()
+ session = settings.Session()
+ ti.refresh_from_db(session=session)
+ self.assertEqual(State.RUNNING, ti.state)
+ self.assertEqual(p1pid, ti.pid)
+
+ # check changing hostname kills task
+ ti.refresh_from_db(session=session, lock_for_update=True)
+ ti.hostname = 'nonexistenthostname'
+ session.merge(ti)
+ session.commit()
+
+ p1.join(5)
+ self.assertFalse(p1.is_alive())
+ finally:
+ try:
+ p1.terminate()
+ except AttributeError:
+ pass # process already terminated
+ try:
+ p2.terminate()
+ except AttributeError:
+ pass # process already terminated
+ session.close()
+
def test_terminate_task(self):
"""If a task instance's db state get deleted, it should fail"""
TI = models.TaskInstance
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
new file mode 100644
index 0000000..b1f810e
--- /dev/null
+++ b/tests/dags/sleep_forever_dag.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+import airflow
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+
+dag = DAG(
+ dag_id='sleep_forever_dag',
+ schedule_interval=None,
+)
+
+task = BashOperator(
+ task_id='sleeps_forever',
+ dag=dag,
+ bash_command="sleep 10000000000",
+ start_date=airflow.utils.dates.days_ago(2),
+ owner='airflow')