You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:36 UTC

[38/45] incubator-airflow git commit: [AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

Right now, a second task instance being triggered
will cause
both itself and the original task to run because
the hostname
and pid fields are updated regardless if the task
is already running.
Also, pid field is not refreshed from db properly.
Also, we should
check against parent's pid.

Will be followed up by working tests.

Closes #2102 from saguziel/aguziel-fix-trigger-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1243ab16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1243ab16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1243ab16

Branch: refs/heads/v1-8-stable
Commit: 1243ab16849ab9716b26aeba6a11ea3e9e9a81ca
Parents: a8f2c27
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Mar 11 10:54:39 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:45 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py                 | 41 ++++++++++++++-----------
 airflow/models.py               |  2 ++
 tests/core.py                   | 59 ++++++++++++++++++++++++++++++++++++
 tests/dags/sleep_forever_dag.py | 29 ++++++++++++++++++
 4 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index c61b229..222d9ba 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2072,15 +2072,6 @@ class LocalTaskJob(BaseJob):
         try:
             self.task_runner.start()
 
-            ti = self.task_instance
-            session = settings.Session()
-            if self.task_runner.process:
-                ti.pid = self.task_runner.process.pid
-            ti.hostname = socket.getfqdn()
-            session.merge(ti)
-            session.commit()
-            session.close()
-
             last_heartbeat_time = time.time()
             heartbeat_time_limit = conf.getint('scheduler',
                                                'scheduler_zombie_task_threshold')
@@ -2120,6 +2111,18 @@ class LocalTaskJob(BaseJob):
         self.task_runner.terminate()
         self.task_runner.on_finish()
 
+    def _is_descendant_process(self, pid):
+        """Checks if pid is a descendant of the current process.
+
+        :param pid: process id to check
+        :type pid: int
+        :rtype: bool
+        """
+        try:
+            return psutil.Process(pid) in psutil.Process().children(recursive=True)
+        except psutil.NoSuchProcess:
+            return False
+
     @provide_session
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
@@ -2133,15 +2136,17 @@ class LocalTaskJob(BaseJob):
         if ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
-            if not (fqdn == ti.hostname and
-                    self.task_runner.process.pid == ti.pid):
-                logging.warning("Recorded hostname and pid of {ti.hostname} "
-                                "and {ti.pid} do not match this instance's "
-                                "which are {fqdn} and "
-                                "{self.task_runner.process.pid}. "
-                                "Taking the poison pill. So long."
-                                .format(**locals()))
-                raise AirflowException("Another worker/process is running this job")
+            if fqdn != ti.hostname:
+                logging.warning("The recorded hostname {ti.hostname} "
+                                "does not match this instance's hostname "
+                                "{fqdn}".format(**locals()))
+                raise AirflowException("Hostname of job runner does not match")
+            elif not self._is_descendant_process(ti.pid):
+                current_pid = os.getpid()
+                logging.warning("Recorded pid {ti.pid} is not a "
+                                "descendant of the current pid "
+                                "{current_pid}".format(**locals()))
+                raise AirflowException("PID of job runner does not match")
         elif (self.was_running
               and self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 32c52ac..7c6590f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -997,6 +997,7 @@ class TaskInstance(Base):
             self.end_date = ti.end_date
             self.try_number = ti.try_number
             self.hostname = ti.hostname
+            self.pid = ti.pid
         else:
             self.state = None
 
@@ -1320,6 +1321,7 @@ class TaskInstance(Base):
         if not test_mode:
             session.add(Log(State.RUNNING, self))
         self.state = State.RUNNING
+        self.pid = os.getpid()
         self.end_date = None
         if not test_mode:
             session.merge(self)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ee7a738..636ad43 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -26,6 +26,7 @@ from datetime import datetime, time, timedelta
 from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
 import signal
+from time import time as timetime
 from time import sleep
 import warnings
 
@@ -895,6 +896,64 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
+    def test_run_task_twice(self):
+        """If two copies of a TI run, the new one should die, and old should live"""
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        TI = models.TaskInstance
+        dag = dagbag.dags.get('sleep_forever_dag')
+        task = dag.task_dict.get('sleeps_forever')
+    
+        ti = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job2 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+
+        p1 = multiprocessing.Process(target=job1.run)
+        p2 = multiprocessing.Process(target=job2.run)
+        try:
+            p1.start()
+            start_time = timetime()
+            sleep(5.0) # must wait for session to be created on p1
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            p1pid = ti.pid
+            settings.engine.dispose()
+            p2.start()
+            p2.join(5) # wait 5 seconds until termination
+            self.assertFalse(p2.is_alive())
+            self.assertTrue(p1.is_alive())
+
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            self.assertEqual(p1pid, ti.pid)
+
+            # check changing hostname kills task
+            ti.refresh_from_db(session=session, lock_for_update=True)
+            ti.hostname = 'nonexistenthostname'
+            session.merge(ti)
+            session.commit()
+
+            p1.join(5)
+            self.assertFalse(p1.is_alive())
+        finally:
+            try:
+                p1.terminate()
+            except AttributeError:
+                pass # process already terminated
+            try:
+                p2.terminate()
+            except AttributeError:
+                pass # process already terminated
+            session.close()
+
     def test_terminate_task(self):
         """If a task instance's db state get deleted, it should fail"""
         TI = models.TaskInstance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
new file mode 100644
index 0000000..b1f810e
--- /dev/null
+++ b/tests/dags/sleep_forever_dag.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+import airflow
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+
+dag = DAG(
+    dag_id='sleep_forever_dag',
+    schedule_interval=None,
+)
+
+task = BashOperator(
+    task_id='sleeps_forever',
+    dag=dag,
+    bash_command="sleep 10000000000",
+    start_date=airflow.utils.dates.days_ago(2),
+    owner='airflow')