You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:21:39 UTC

[GitHub] stale[bot] closed pull request #2483: [AIRFLOW-1463] Clear state of queued task

stale[bot] closed pull request #2483: [AIRFLOW-1463] Clear state of queued task
URL: https://github.com/apache/incubator-airflow/pull/2483
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a8543d38a9..945dc1e97c 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -304,6 +304,31 @@ def set_is_paused(is_paused, args, dag=None):
     print(msg)
 
 
+def mark_task_as(dag_id, task_id, execution_date, state):
+    """Finds task instance with specified fields, and changes state.
+
+    If the task instance doesn't exist, it does nothing.
+
+    """
+    session = settings.Session()
+    from sqlalchemy import and_
+    tis = session.query(TaskInstance).filter(
+        and_(
+            TaskInstance.dag_id == dag_id,
+            TaskInstance.task_id == task_id,
+            TaskInstance.execution_date == execution_date
+        )
+    ).all()
+    if tis:
+        assert len(tis) == 1, (
+            "There must be at most one task instance with given properties"
+        )
+        ti = tis[0]
+        ti.state = state
+        session.merge(ti)
+        session.commit()
+
+
 def run(args, dag=None):
     # Disable connection pooling to reduce the # of connections on the DB
     # while it's waiting for the task to finish.
@@ -327,7 +352,19 @@ def run(args, dag=None):
         settings.configure_orm()
 
     if not args.pickle and not dag:
-        dag = get_dag(args)
+        try:
+            dag = get_dag(args)
+        except Exception as e:
+            # DAG import can fail
+            # it's an app dev code, we cannot require it to be reliable,
+            # so we catch this error here and set task instance state to NONE
+            # to reschedule it
+            # DAG import errors are observed and expected to be transient
+            print('Failed to load DAG, reason: %r' % e)
+            print('Setting the task state back to NONE')
+            from airflow.utils.state import State
+            mark_task_as(args.dag_id, args.task_id, args.execution_date, State.NONE)
+            raise e
     elif not dag:
         session = settings.Session()
         logging.info('Loading pickle id {args.pickle}'.format(args=args))
diff --git a/tests/core.py b/tests/core.py
index 923e0c3e86..3105c63c79 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1088,6 +1088,7 @@ def _cleanup(session=None):
 
         session.query(models.Pool).delete()
         session.query(models.Variable).delete()
+        session.query(models.TaskInstance).delete()
         session.commit()
         session.close()
 
@@ -1326,6 +1327,24 @@ def test_cli_run(self):
             'run', 'example_bash_operator', 'runme_0', '-l',
             DEFAULT_DATE.isoformat()]))
 
+    def test_cli_run_import_failure(self):
+        task = DummyOperator(dag_id='no_such_dag', task_id='no_such_task')
+        self.session.add(
+            models.TaskInstance(
+                task,
+                execution_date=DEFAULT_DATE,
+                state=State.QUEUED
+            )
+        )
+        self.session.commit()
+        with self.assertRaises(Exception):
+            cli.run(self.parser.parse_args([
+                'run', task.dag_id, task.task_id, '-l',
+                DEFAULT_DATE.isoformat()]))
+        ti = self.session.query(models.TaskInstance).filter_by(
+            dag_id=task.dag_id, task_id=task.task_id).first()
+        self.assertTrue(ti.state == State.NONE)
+
     def test_task_state(self):
         cli.task_state(self.parser.parse_args([
             'task_state', 'example_bash_operator', 'runme_0',


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services