You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/12/16 21:28:30 UTC

incubator-airflow git commit: [AIRFLOW-679] Stop concurrent task instances from running

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0be87d5c4 -> 51acc5087


[AIRFLOW-679] Stop concurrent task instances from running

Check that PID remains unchanged, and throw
exception otherwise.

Testing Done:
- Ran a task, set PID to be different, and
ensuring it failed

If there's a connection error while heartbeating,
it should retry. Also,
if it hasn't been able to heartbeat for a while,
it should kill the
child processes so that we don't have 2 of the
same task running.

Closes #1939 from saguziel/consistency


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/51acc508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/51acc508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/51acc508

Branch: refs/heads/master
Commit: 51acc50875ee64af135caaf0583f51ef3868cda3
Parents: 0be87d5
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Dec 16 13:28:02 2016 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Dec 16 13:28:07 2016 -0800

----------------------------------------------------------------------
 airflow/jobs.py                                 | 20 +++++++++--
 ...e7d17757c7a_add_pid_field_to_taskinstance.py | 37 ++++++++++++++++++++
 airflow/models.py                               |  1 +
 3 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index a2d94e3..81c77a8 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2005,6 +2005,13 @@ class LocalTaskJob(BaseJob):
             )
             self.process = subprocess.Popen(['bash', '-c', command])
             self.logger.info("Subprocess PID is {}".format(self.process.pid))
+            ti = self.task_instance
+            session = settings.Session()
+            ti.pid = self.process.pid
+            ti.hostname = socket.getfqdn()
+            session.merge(ti)
+            session.commit()
+            session.close()
 
             last_heartbeat_time = time.time()
             heartbeat_time_limit = conf.getint('scheduler',
@@ -2054,11 +2061,20 @@ class LocalTaskJob(BaseJob):
         # Suicide pill
         TI = models.TaskInstance
         ti = self.task_instance
-        state = session.query(TI.state).filter(
+        new_ti = session.query(TI).filter(
             TI.dag_id==ti.dag_id, TI.task_id==ti.task_id,
             TI.execution_date==ti.execution_date).scalar()
-        if state == State.RUNNING:
+        if new_ti.state == State.RUNNING:
             self.was_running = True
+            fqdn = socket.getfqdn()
+            if not (fqdn == new_ti.hostname and self.process.pid == new_ti.pid):
+                logging.warning("Recorded hostname and pid of {new_ti.hostname} "
+                                "and {new_ti.pid} do not match this instance's "
+                                "which are {fqdn} and "
+                                "{self.process.pid}. Taking the poison pill. So "
+                                "long."
+                                .format(**locals()))
+                raise AirflowException("Another worker/process is running this job")
         elif self.was_running and hasattr(self, 'process'):
             logging.warning(
                 "State of this instance has been externally set to "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
new file mode 100644
index 0000000..7146864
--- /dev/null
+++ b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py
@@ -0,0 +1,37 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add pid field to TaskInstance
+
+Revision ID: 5e7d17757c7a
+Revises: 8504051e801b
+Create Date: 2016-12-07 15:51:37.119478
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '5e7d17757c7a'
+down_revision = '8504051e801b'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.add_column('task_instance', sa.Column('pid', sa.Integer))
+
+
+def downgrade():
+    op.drop_column('task_instance', 'pid')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/51acc508/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f46a352..5d7075d 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -733,6 +733,7 @@ class TaskInstance(Base):
     priority_weight = Column(Integer)
     operator = Column(String(1000))
     queued_dttm = Column(DateTime)
+    pid = Column(Integer)
 
     __table_args__ = (
         Index('ti_dag_state', dag_id, state),