You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Sam Danbury (JIRA)" <ji...@apache.org> on 2019/01/28 15:49:00 UTC

[jira] [Created] (AIRFLOW-3778) QUEUED task not being scheduled

Sam Danbury created AIRFLOW-3778:
------------------------------------

             Summary: QUEUED task not being scheduled
                 Key: AIRFLOW-3778
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3778
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: 1.9.0
            Reporter: Sam Danbury


Can you point me to the bit of the code that queues tasks that are in a QUEUED state please?

 

I can find code that schedules tasks in a NONE or UP_FOR_RETRY state, but not QUEUED.

I had a situation where the scheduler crashed whilst my DAG was being scheduled, so some tasks were left set to QUEUED. When I restart the scheduler, the tasks still don't get scheduled and remain in QUEUED state indefinitely.

I have recreated the scenario by waiting until some tasks are in a QUEUED state, then kill the scheduler process. Restarting the scheduler doesn't automatically run the tasks.

I setup a simple DAG to test:

 
{code:java}
from airflow import utils
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

now = datetime.now()
now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
START_DATE = now_to_the_hour
DAG_NAME = 'pool_dag'

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': utils.dates.days_ago(2)
}
dag = DAG(DAG_NAME, schedule_interval=None, default_args=default_args, catchup=False)

for i in range(16):
    pool = None
    if (i % 2) == 0:
        pool = 'task_pool'

    task = BashOperator(
        task_id='runme_{}'.format(i),
        params={
            'duration': i * 10
        },
        pool=pool,
        bash_command='sleep {{ params.duration }}',
        dag=dag)
{code}

I was trying to create a scenario with a LocalExecutor (parallelism=8) where we had 16 tasks running with different sleep times, half of which would use a pool and half use the default pool of workers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)