You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/18 04:22:01 UTC

[jira] [Commented] (AIRFLOW-1463) Scheduler does not reschedule tasks in QUEUED state

    [ https://issues.apache.org/jira/browse/AIRFLOW-1463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723666#comment-16723666 ] 

ASF GitHub Bot commented on AIRFLOW-1463:
-----------------------------------------

stale[bot] closed pull request #2483: [AIRFLOW-1463] Clear state of queued task
URL: https://github.com/apache/incubator-airflow/pull/2483
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a8543d38a9..945dc1e97c 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -304,6 +304,31 @@ def set_is_paused(is_paused, args, dag=None):
     print(msg)
 
 
+def mark_task_as(dag_id, task_id, execution_date, state):
+    """Finds task instance with specified fields, and changes state.
+
+    If the task instance doesn't exist, it does nothing.
+
+    """
+    session = settings.Session()
+    from sqlalchemy import and_
+    tis = session.query(TaskInstance).filter(
+        and_(
+            TaskInstance.dag_id == dag_id,
+            TaskInstance.task_id == task_id,
+            TaskInstance.execution_date == execution_date
+        )
+    ).all()
+    if tis:
+        assert len(tis) == 1, (
+            "There must be at most one task instance with given properties"
+        )
+        ti = tis[0]
+        ti.state = state
+        session.merge(ti)
+        session.commit()
+
+
 def run(args, dag=None):
     # Disable connection pooling to reduce the # of connections on the DB
     # while it's waiting for the task to finish.
@@ -327,7 +352,19 @@ def run(args, dag=None):
         settings.configure_orm()
 
     if not args.pickle and not dag:
-        dag = get_dag(args)
+        try:
+            dag = get_dag(args)
+        except Exception as e:
+            # DAG import can fail
+            # it's an app dev code, we cannot require it to be reliable,
+            # so we catch this error here and set task instance state to NONE
+            # to reschedule it
+            # DAG import errors are observed and expected to be transient
+            print('Failed to load DAG, reason: %r' % e)
+            print('Setting the task state back to NONE')
+            from airflow.utils.state import State
+            mark_task_as(args.dag_id, args.task_id, args.execution_date, State.NONE)
+            raise e
     elif not dag:
         session = settings.Session()
         logging.info('Loading pickle id {args.pickle}'.format(args=args))
diff --git a/tests/core.py b/tests/core.py
index 923e0c3e86..3105c63c79 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1088,6 +1088,7 @@ def _cleanup(session=None):
 
         session.query(models.Pool).delete()
         session.query(models.Variable).delete()
+        session.query(models.TaskInstance).delete()
         session.commit()
         session.close()
 
@@ -1326,6 +1327,24 @@ def test_cli_run(self):
             'run', 'example_bash_operator', 'runme_0', '-l',
             DEFAULT_DATE.isoformat()]))
 
+    def test_cli_run_import_failure(self):
+        task = DummyOperator(dag_id='no_such_dag', task_id='no_such_task')
+        self.session.add(
+            models.TaskInstance(
+                task,
+                execution_date=DEFAULT_DATE,
+                state=State.QUEUED
+            )
+        )
+        self.session.commit()
+        with self.assertRaises(Exception):
+            cli.run(self.parser.parse_args([
+                'run', task.dag_id, task.task_id, '-l',
+                DEFAULT_DATE.isoformat()]))
+        ti = self.session.query(models.TaskInstance).filter_by(
+            dag_id=task.dag_id, task_id=task.task_id).first()
+        self.assertTrue(ti.state == State.NONE)
+
     def test_task_state(self):
         cli.task_state(self.parser.parse_args([
             'task_state', 'example_bash_operator', 'runme_0',


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Scheduler does not reschedule tasks in QUEUED state
> ---------------------------------------------------
>
>                 Key: AIRFLOW-1463
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1463
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: cli
>         Environment: Ubuntu 14.04
> Airflow 1.8.0
> SQS backed task queue, AWS RDS backed meta storage
> DAG folder is synced by script on code push: archive is downloaded from s3, unpacked, moved, install script is run. airflow executable is replaced with symlink pointing to the latest version of code, no airflow processes are restarted.
>            Reporter: Stanislav Pak
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Our pipelines related code is deployed almost simultaneously on all airflow boxes: scheduler+webserver box, workers boxes. Some common python package is deployed on those boxes on every other code push (3-5 deployments per hour). Due to installation specifics, a DAG that imports module from that package might fail. If DAG import fails when worker runs a task, the task is still removed from the queue but task state is not changed, so in this case the task stays in QUEUED state forever.
> Beside the described case, there is scenario when it happens because of DAG update lag in scheduler. A task can be scheduled with old DAG and worker can run the task with new DAG that fails to be imported.
> There might be other scenarios when it happens.
> Proposal:
> Catch errors when importing DAG on task run and clear task instance state if import fails. This should fix transient issues of this kind.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)