You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Rupesh Bansal (JIRA)" <ji...@apache.org> on 2017/12/07 09:25:00 UTC

[jira] [Commented] (AIRFLOW-1258) TaskInstances within SubDagOperator are marked as failed after an hour

    [ https://issues.apache.org/jira/browse/AIRFLOW-1258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281550#comment-16281550 ] 

Rupesh Bansal commented on AIRFLOW-1258:
----------------------------------------

+1 Facing the same issue. Can someone help here?

> TaskInstances within SubDagOperator are marked as failed after an hour
> ----------------------------------------------------------------------
>
>                 Key: AIRFLOW-1258
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1258
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.1
>            Reporter: John Doe
>             Fix For: 1.9.0
>
>
> We have multiple SubDagOperators which we use to isolate individual units in our broader dags (we typically have tens of SubDagOperators in a given DAG). For any TaskInstance inside the SubDag which runs over an hour, the dag fails right after the 1 hour mark.
> This is completely unrelated to our codebase and can be recreated with a sleep BashOperator:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.subdag_operator import SubDagOperator
> DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
> def define_sub(dag, step_name, sleeptime):
>     op = BashOperator(
>         task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
>     )
>     return dag
> def gen_sub_dag(parent_name, step_name, sleeptime):
>     sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
>     define_sub(sub, step_name, sleeptime)
>     return sub
> long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
> long_sub_dag = SubDagOperator(
>     subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
> )
> {code}
> Under Airflow 1.7.1.3, we would see the following error in the SubDagOperator:
> {code} [2017-05-25 17:08:56,082] {jobs.py:965} ERROR - The airflow run command failed at reporting an error. This should not occur in normal circumstances. Task state is 'running',reported state is 'success'. TI is <TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-24 16:00:00 [running]> {code}
> which we could then manually mark as 'success' in the airflow database. However, starting in 1.8.1 (we skipped 1.8.0 as AIRFLOW-1004 was a hard blocker) the SubDag now instead fails outright.
> Nothing in the logs indicate any reason for the failure - we've reduced the level to DEBUG and still see nothing.
> airflow-scheduler.log:
> {code}
> 2017-05-31 19:44:10,260 INFO - Heartbeating the process manager
> 2017-05-31 19:44:10,263 INFO - Started a process (PID: 6462) to generate tasks for /efs/airflow/dags/long_running.py - logging into /opt/airflow/logs/scheduler/2017-05-31/long_running.py.log
> 2017-05-31 19:44:10,268 INFO - Heartbeating the executor
> 2017-05-31 19:44:10,271 INFO - Executor reports long_runner.long_runner_sub execution_date=2017-05-31 18:42:55.400517 as failed
> 2017-05-31 19:44:10,324 INFO - Heartbeating the scheduler
> {code}
>  
> In the SubDagOperator log, we see a second task queued immediately before the failure - despite the original task running unabated:
> {code}
> [2017-05-31 19:44:04,441] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run long_runner long_runner_sub 2017-05-31T18:42:55.400517 --job_id 108 --raw -sd DAGS_FOLDER/long_running.py']
> [2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,815] {models.py:1122} INFO - Dependencies not met for <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
> [2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,816] {models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 'Task Instance Not Already Running' PASSED: False, Task is already running, it started on 2017-05-31 18:43:03.494829.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817] {models.py:1122} INFO - Dependencies not met for <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2017-05-31 18:43:03.494829.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
> [2017-05-31 19:44:05,818] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817] {models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517 [running]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
> [2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:10,466] {jobs.py:1722} DEBUG - Executor state: failed task <TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-31 18:42:55.400517 [running]>
> [2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:10,467] {jobs.py:1729} ERROR - Executor reports task instance <TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-31 18:42:55.400517 [running]> finished (failed) although the task says its running. Was the task killed externally?
> {code}
> It's unclear if the second task is a response to the first incorrectly being marked as a failure, or if the second task being queued causes the failure state when it takes the poison pill



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)