You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "John Culver (JIRA)" <ji...@apache.org> on 2017/04/27 19:46:04 UTC

[jira] [Updated] (AIRFLOW-1157) Assigning a task to a pool that doesn't exist crashes the scheduler

     [ https://issues.apache.org/jira/browse/AIRFLOW-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

John Culver updated AIRFLOW-1157:
---------------------------------
    Description: 
If a dag is run that contains a task using a pool that doesn't exist, the scheduler will crash.

Manually triggering the run of this dag on an environment without a pool named 'a_non_existent_pool' will crash the scheduler:

{code}
from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator


dag = DAG(dag_id='crash_scheduler',
          start_date=datetime(2017,1,1),
          schedule_interval=None)


t1 = DummyOperator(task_id='crash',
                   pool='a_non_existent_pool',
                   dag=dag)

{code}


Here is the relevant log output on the scheduler:

{noformat}
[2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test-3.py finished
[2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test_s3_file_move.py finished
[2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
[2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
[2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893 [scheduled]>
[2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16 running tasks
[2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) to queued
[2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd /opt/airflow/dags/test_s3_file_move.py
[2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
[2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/crash_scheduler.py finished
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/configuration/constants.py finished
[2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
[2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
[2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 [scheduled]>
[2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
[2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in scheduler
    job.run()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in _execute
    self._execute_helper(processor_manager)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in _execute_helper
    (State.SCHEDULED,))
  File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in _execute_task_instances
    open_slots = pools[pool].open_slots(session=session)
KeyError: u'a_non_existant_pool'
{noformat}

  was:
If a dag is run that contains a task using a pool that doesn't exist, the scheduler will crash.

Manually triggering the run of this dag on an environment without a pool named 'a_non_existent_pool' will crash the scheduler:

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator


dag = DAG(dag_id='crash_scheduler',
          start_date=datetime(2017,1,1),
          schedule_interval=None)


t1 = DummyOperator(task_id='crash',
                   pool='a_non_existent_pool',
                   dag=dag)




Here is the relevant log output on the scheduler:

[2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test-3.py finished
[2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test_s3_file_move.py finished
[2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
[2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
[2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893 [scheduled]>
[2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16 running tasks
[2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) to queued
[2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd /opt/airflow/dags/test_s3_file_move.py
[2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
[2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/crash_scheduler.py finished
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/configuration/constants.py finished
[2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
[2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
[2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 [scheduled]>
[2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
[2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in scheduler
    job.run()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in _execute
    self._execute_helper(processor_manager)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in _execute_helper
    (State.SCHEDULED,))
  File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in _execute_task_instances
    open_slots = pools[pool].open_slots(session=session)
KeyError: u'a_non_existant_pool'





> Assigning a task to a pool that doesn't exist crashes the scheduler
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-1157
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1157
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.8
>            Reporter: John Culver
>            Priority: Critical
>
> If a dag is run that contains a task using a pool that doesn't exist, the scheduler will crash.
> Manually triggering the run of this dag on an environment without a pool named 'a_non_existent_pool' will crash the scheduler:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(dag_id='crash_scheduler',
>           start_date=datetime(2017,1,1),
>           schedule_interval=None)
> t1 = DummyOperator(task_id='crash',
>                    pool='a_non_existent_pool',
>                    dag=dag)
> {code}
> Here is the relevant log output on the scheduler:
> {noformat}
> [2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test-3.py finished
> [2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test_s3_file_move.py finished
> [2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
> [2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
> [2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893 [scheduled]>
> [2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
> [2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16 running tasks
> [2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) to queued
> [2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd /opt/airflow/dags/test_s3_file_move.py
> [2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
> [2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
> [2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/crash_scheduler.py finished
> [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/configuration/constants.py finished
> [2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
> [2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
> [2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
>         <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 [scheduled]>
> [2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
> [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
> [2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
> Traceback (most recent call last):
>   File "/usr/bin/airflow", line 28, in <module>
>     args.func(args)
>   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in scheduler
>     job.run()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
>     self._execute()
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in _execute
>     self._execute_helper(processor_manager)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in _execute_helper
>     (State.SCHEDULED,))
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in _execute_task_instances
>     open_slots = pools[pool].open_slots(session=session)
> KeyError: u'a_non_existant_pool'
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)