You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "John Culver (JIRA)" <ji...@apache.org> on 2017/04/27 19:35:04 UTC

[jira] [Created] (AIRFLOW-1157) Assigning a task to a pool that doesn't exist crashes the scheduler

John Culver created AIRFLOW-1157:
------------------------------------

             Summary: Assigning a task to a pool that doesn't exist crashes the scheduler
                 Key: AIRFLOW-1157
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1157
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: Airflow 1.8
            Reporter: John Culver
            Priority: Critical


If a dag is run that contains a task using a pool that doesn't exist, the scheduler will crash.

Manually triggering the run of this dag on an environment without a pool named 'a_non_existent_pool' will crash the scheduler:

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator


dag = DAG(dag_id='crash_scheduler',
          start_date=datetime(2017,1,1),
          schedule_interval=None)


t1 = DummyOperator(task_id='crash',
                   pool='a_non_existent_pool',
                   dag=dag)




Here is the relevant log output on the scheduler:

[2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test-3.py finished
[2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/test_s3_file_move.py finished
[2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log
[2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log
[2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: move_s3_file_test.move_files 2017-04-27 19:31:22.298893 [scheduled]>
[2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has 0/16 running tasks
[2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) to queued
[2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local -sd /opt/airflow/dags/test_s3_file_move.py
[2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor
[2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev
[2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process manager
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/crash_scheduler.py finished
[2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for /opt/airflow/dags/configuration/constants.py finished
[2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log
[2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log
[2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution:
        <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 [scheduled]>
[2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128
[2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129
[2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for processes to exit...
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in scheduler
    job.run()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in _execute
    self._execute_helper(processor_manager)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in _execute_helper
    (State.SCHEDULED,))
  File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in _execute_task_instances
    open_slots = pools[pool].open_slots(session=session)
KeyError: u'a_non_existant_pool'






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)