You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/11 18:55:19 UTC

incubator-airflow git commit: [AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d7d9f883e -> bb39078a3


[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

Right now, a second task instance being triggered
will cause
both itself and the original task to run because
the hostname
and pid fields are updated regardless if the task
is already running.
Also, pid field is not refreshed from db properly.
Also, we should
check against parent's pid.

Will be followed up by working tests.

Closes #2102 from saguziel/aguziel-fix-trigger-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb39078a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb39078a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb39078a

Branch: refs/heads/master
Commit: bb39078a35cf2bceea58d7831d7a2028c8ef849f
Parents: d7d9f88
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Mar 11 10:54:39 2017 -0800
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Mar 11 10:54:45 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py                 | 41 ++++++++++++++-----------
 airflow/models.py               |  2 ++
 tests/core.py                   | 59 ++++++++++++++++++++++++++++++++++++
 tests/dags/sleep_forever_dag.py | 29 ++++++++++++++++++
 4 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb39078a/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index c7db99f..006a180 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2072,15 +2072,6 @@ class LocalTaskJob(BaseJob):
         try:
             self.task_runner.start()
 
-            ti = self.task_instance
-            session = settings.Session()
-            if self.task_runner.process:
-                ti.pid = self.task_runner.process.pid
-            ti.hostname = socket.getfqdn()
-            session.merge(ti)
-            session.commit()
-            session.close()
-
             last_heartbeat_time = time.time()
             heartbeat_time_limit = conf.getint('scheduler',
                                                'scheduler_zombie_task_threshold')
@@ -2120,6 +2111,18 @@ class LocalTaskJob(BaseJob):
         self.task_runner.terminate()
         self.task_runner.on_finish()
 
+    def _is_descendant_process(self, pid):
+        """Checks if pid is a descendant of the current process.
+
+        :param pid: process id to check
+        :type pid: int
+        :rtype: bool
+        """
+        try:
+            return psutil.Process(pid) in psutil.Process().children(recursive=True)
+        except psutil.NoSuchProcess:
+            return False
+
     @provide_session
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
@@ -2133,15 +2136,17 @@ class LocalTaskJob(BaseJob):
         if ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
-            if not (fqdn == ti.hostname and
-                    self.task_runner.process.pid == ti.pid):
-                logging.warning("Recorded hostname and pid of {ti.hostname} "
-                                "and {ti.pid} do not match this instance's "
-                                "which are {fqdn} and "
-                                "{self.task_runner.process.pid}. "
-                                "Taking the poison pill. So long."
-                                .format(**locals()))
-                raise AirflowException("Another worker/process is running this job")
+            if fqdn != ti.hostname:
+                logging.warning("The recorded hostname {ti.hostname} "
+                                "does not match this instance's hostname "
+                                "{fqdn}".format(**locals()))
+                raise AirflowException("Hostname of job runner does not match")
+            elif not self._is_descendant_process(ti.pid):
+                current_pid = os.getpid()
+                logging.warning("Recorded pid {ti.pid} is not a "
+                                "descendant of the current pid "
+                                "{current_pid}".format(**locals()))
+                raise AirflowException("PID of job runner does not match")
         elif (self.was_running
               and self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb39078a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index ed483f5..1244d60 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -997,6 +997,7 @@ class TaskInstance(Base):
             self.end_date = ti.end_date
             self.try_number = ti.try_number
             self.hostname = ti.hostname
+            self.pid = ti.pid
         else:
             self.state = None
 
@@ -1320,6 +1321,7 @@ class TaskInstance(Base):
         if not test_mode:
             session.add(Log(State.RUNNING, self))
         self.state = State.RUNNING
+        self.pid = os.getpid()
         self.end_date = None
         if not test_mode:
             session.merge(self)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb39078a/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 47a7d2b..5a93222 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -26,6 +26,7 @@ from datetime import datetime, time, timedelta
 from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
 import signal
+from time import time as timetime
 from time import sleep
 import warnings
 
@@ -895,6 +896,64 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
+    def test_run_task_twice(self):
+        """If two copies of a TI run, the new one should die, and old should live"""
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        TI = models.TaskInstance
+        dag = dagbag.dags.get('sleep_forever_dag')
+        task = dag.task_dict.get('sleeps_forever')
+    
+        ti = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job2 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+
+        p1 = multiprocessing.Process(target=job1.run)
+        p2 = multiprocessing.Process(target=job2.run)
+        try:
+            p1.start()
+            start_time = timetime()
+            sleep(5.0) # must wait for session to be created on p1
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            p1pid = ti.pid
+            settings.engine.dispose()
+            p2.start()
+            p2.join(5) # wait 5 seconds until termination
+            self.assertFalse(p2.is_alive())
+            self.assertTrue(p1.is_alive())
+
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            self.assertEqual(p1pid, ti.pid)
+
+            # check changing hostname kills task
+            ti.refresh_from_db(session=session, lock_for_update=True)
+            ti.hostname = 'nonexistenthostname'
+            session.merge(ti)
+            session.commit()
+
+            p1.join(5)
+            self.assertFalse(p1.is_alive())
+        finally:
+            try:
+                p1.terminate()
+            except AttributeError:
+                pass # process already terminated
+            try:
+                p2.terminate()
+            except AttributeError:
+                pass # process already terminated
+            session.close()
+
     def test_terminate_task(self):
         """If a task instance's db state get deleted, it should fail"""
         TI = models.TaskInstance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb39078a/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
new file mode 100644
index 0000000..b1f810e
--- /dev/null
+++ b/tests/dags/sleep_forever_dag.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+import airflow
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+
+dag = DAG(
+    dag_id='sleep_forever_dag',
+    schedule_interval=None,
+)
+
+task = BashOperator(
+    task_id='sleeps_forever',
+    dag=dag,
+    bash_command="sleep 10000000000",
+    start_date=airflow.utils.dates.days_ago(2),
+    owner='airflow')