You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/07/10 22:26:57 UTC

incubator-airflow git commit: [AIRFLOW-1366] Add max_tries to task instance

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b532d8d77 -> 4f20f6077


[AIRFLOW-1366] Add max_tries to task instance

Right now Airflow deletes the task instance when
user clear it. We have no way of keeping track of
how many times a task instance gets run either via
user or itself. So instead of deleting the task
instance record, we should keep the task instance
and make try_number monotonically increasing for
every task instance attempt. max_tries is
introduced as an upper bound for retrying tasks by
task itself.

This new column will be used to update logic
behind clear_task_instances.

db migration is tested locally.

Closes #2409 from AllisonWang/allison--max-tries


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f20f607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f20f607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f20f607

Branch: refs/heads/master
Commit: 4f20f607764bb3477419321b5dfd0c53ba1db3c0
Parents: b532d8d
Author: AllisonWang <al...@gmail.com>
Authored: Mon Jul 10 15:26:08 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon Jul 10 15:26:12 2017 -0700

----------------------------------------------------------------------
 ...dc7_add_max_tries_column_to_task_instance.py | 106 +++++++++++++++++++
 airflow/models.py                               |   3 +
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
new file mode 100644
index 0000000..2d5ffc2
--- /dev/null
+++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -0,0 +1,106 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add max tries column to task instance
+
+Revision ID: cc1e65623dc7
+Revises: 127d2bf2dfa7
+Create Date: 2017-06-19 16:53:12.851141
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'cc1e65623dc7'
+down_revision = '127d2bf2dfa7'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+from airflow import settings
+from airflow.models import DagBag, TaskInstance
+
+BATCH_SIZE = 5000
+
+def upgrade():
+    op.add_column('task_instance', sa.Column('max_tries', sa.Integer,
+        server_default="-1"))
+    # Check if table task_instance exist before data migration. This check is
+    # needed for database that does not create table until migration finishes.
+    # Checking task_instance table exists prevent the error of querying
+    # non-existing task_instance table.
+    engine = settings.engine
+    if engine.dialect.has_table(engine, 'task_instance'):
+        # Get current session
+        connection = op.get_bind()
+        sessionmaker = sa.orm.sessionmaker()
+        session = sessionmaker(bind=connection)
+        dagbag = DagBag(settings.DAGS_FOLDER)
+        query = session.query(sa.func.count(TaskInstance.max_tries)).filter(
+            TaskInstance.max_tries == -1
+        )
+        # Separate db query in batch to prevent loading entire table
+        # into memory and cause out of memory error.
+        while query.scalar():
+            tis = session.query(TaskInstance).filter(
+                TaskInstance.max_tries == -1
+            ).limit(BATCH_SIZE).all()
+            for ti in tis:
+                dag = dagbag.get_dag(ti.dag_id)
+                if not dag or not dag.has_task(ti.task_id):
+                    # task_instance table might not have the up-to-date
+                    # information, i.e dag or task might be modified or
+                    # deleted in dagbag but is reflected in task instance
+                    # table. In this case we do not retry the task that can't
+                    # be parsed.
+                    ti.max_tries = ti.try_number
+                else:
+                    task = dag.get_task(ti.task_id)
+                    ti.max_tries = task.retries
+                session.merge(ti)
+            session.commit()
+        # Commit the current session.
+        session.commit()
+
+
+def downgrade():
+    engine = settings.engine
+    if engine.dialect.has_table(engine, 'task_instance'):
+        connection = op.get_bind()
+        sessionmaker = sa.orm.sessionmaker()
+        session = sessionmaker(bind=connection)
+        dagbag = DagBag(settings.DAGS_FOLDER)
+        query = session.query(sa.func.count(TaskInstance.max_tries)).filter(
+            TaskInstance.max_tries != -1
+        )
+        while query.scalar():
+            tis = session.query(TaskInstance).filter(
+                TaskInstance.max_tries != -1
+            ).limit(BATCH_SIZE).all()
+            for ti in tis:
+                dag = dagbag.get_dag(ti.dag_id)
+                if not dag or not dag.has_task(ti.task_id):
+                    ti.try_number = 0
+                else:
+                    task = dag.get_task(ti.task_id)
+                    # max_tries - try_number is number of times a task instance
+                    # left to retry by itself. So the current try_number should be
+                    # max number of self retry (task.retries) minus number of
+                    # times left for task instance to try the task.
+                    ti.try_number = max(0, task.retries - (ti.max_tries -
+                        ti.try_number))
+                ti.max_tries = -1
+                session.merge(ti)
+            session.commit()
+        session.commit()
+    op.drop_column('task_instance', 'max_tries')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8566b7f..32ad144 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -754,6 +754,7 @@ class TaskInstance(Base):
     duration = Column(Float)
     state = Column(String(20))
     try_number = Column(Integer, default=0)
+    max_tries = Column(Integer)
     hostname = Column(String(1000))
     unixname = Column(String(1000))
     job_id = Column(Integer)
@@ -780,6 +781,7 @@ class TaskInstance(Base):
         self.pool = task.pool
         self.priority_weight = task.priority_weight_total
         self.try_number = 0
+        self.max_tries = self.task.retries
         self.unixname = getpass.getuser()
         self.run_as_user = task.run_as_user
         if state:
@@ -1021,6 +1023,7 @@ class TaskInstance(Base):
             self.start_date = ti.start_date
             self.end_date = ti.end_date
             self.try_number = ti.try_number
+            self.max_tries = ti.max_tries
             self.hostname = ti.hostname
             self.pid = ti.pid
         else: