You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Jayesh Senjaliya <jh...@gmail.com> on 2017/01/25 01:46:01 UTC

possible bug: scheduler dont schedule manual triggered dag on 1.8.b2

I am running airflow 1.8.b2

a week before I had perfectly running dag which i put it on 1.8.b2 and I
find that scheduler is not scheduling the task even though the dag run was
created.

I debugged the issue and came down to this piece of code where scheduler
finds task to schedule

TI = models.TaskInstance
task_instances_to_examine = (
    session
    .query(TI)
    .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
    .filter(TI.state.in_(states))
    .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
                       TI.execution_date == DagRun.execution_date,
                       DagRun.state == State.RUNNING,
                       DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
    .all()
)


the root cause i think is join in the above query was added recently with
this PR  "[AIRFLOW-219][AIRFLOW-398] Cgroups + impersonation"
which has condition that run_id should be like DagRun.ID_PREFIX which is
"scheduled__" which wont be true for manually triggered dag ( which has
run_id like "manual__2017-01-24T17:08:35.251019" )

so I am not sure why we need that condition there at all, does anyone know?

Please comment.

Thanks
Jayesh

Re: possible bug: scheduler dont schedule manual triggered dag on 1.8.b2

Posted by Jayesh Senjaliya <jh...@gmail.com>.
Cool, Thanks for fixing it quickly, i just tested it and it works fine.
commented on git.


On Wed, Jan 25, 2017 at 3:08 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> Further analysis shows that the change that created this issue was
> unnecessary. TaskInstances will be set to 'scheduled' if they meet the
> criteria to run, also the ones up for retry. No task_instance will be send
> to the executor in another state than 'scheduled'. Thus in an earlier patch
> I removed the 'upforretry' parameter from _execute_state_instances. This
> change is there to verify a DagRun is running, while this has already been
> covered, by contract, earlier.
>
> I created AIRFLOW-803 and https://github.com/apache/
> incubator-airflow/pull/2019. I removed the index creation that was done
> as part of this.
>
> Please review, I would like to have it part of Beta 3, the bug itself I
> consider to be a blocker for release.
>
> Bolke
>
>
> > On 25 Jan 2017, at 09:32, Bolke de Bruin <bd...@gmail.com> wrote:
> >
> > Probably (havent tested it yet) this makes it work again:
> >
> > TI = models.TaskInstance
> > task_instances_to_examine = (
> >     session
> >     .query(TI)
> >     .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
> >     .filter(TI.state.in_(states))
> >     .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
> >                        TI.execution_date == DagRun.execution_date,
> >                        DagRun.state == State.RUNNING,
> >                        DagRun.run_id.notlike('backfill' + '%')))
> >     .all()
> > )
> >
> >
> > Bolke
> >
> >
> >> On 25 Jan 2017, at 09:00, Bolke de Bruin <bdbruin@gmail.com <mailto:
> bdbruin@gmail.com>> wrote:
> >>
> >> Thanks for tracking this down Jayesh. I suspect not wanting ‘backfills’
> is the reason for it (ie. backfills are not scheduled yet), but I need to
> look through the code. Maybe Dan or Paul can chime in?
> >>
> >> Bolke
> >>
> >>> On 25 Jan 2017, at 02:46, Jayesh Senjaliya <jhsonline@gmail.com
> <ma...@gmail.com>> wrote:
> >>>
> >>> I am running airflow 1.8.b2
> >>>
> >>> a week before I had perfectly running dag which i put it on 1.8.b2 and
> I
> >>> find that scheduler is not scheduling the task even though the dag run
> was
> >>> created.
> >>>
> >>> I debugged the issue and came down to this piece of code where
> scheduler
> >>> finds task to schedule
> >>>
> >>> TI = models.TaskInstance
> >>> task_instances_to_examine = (
> >>>   session
> >>>   .query(TI)
> >>>   .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
> >>>   .filter(TI.state.in_(states))
> >>>   .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
> >>>                      TI.execution_date == DagRun.execution_date,
> >>>                      DagRun.state == State.RUNNING,
> >>>                      DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
> >>>   .all()
> >>> )
> >>>
> >>>
> >>> the root cause i think is join in the above query was added recently
> with
> >>> this PR  "[AIRFLOW-219][AIRFLOW-398] Cgroups + impersonation"
> >>> which has condition that run_id should be like DagRun.ID_PREFIX which
> is
> >>> "scheduled__" which wont be true for manually triggered dag ( which has
> >>> run_id like "manual__2017-01-24T17:08:35.251019" )
> >>>
> >>> so I am not sure why we need that condition there at all, does anyone
> know?
> >>>
> >>> Please comment.
> >>>
> >>> Thanks
> >>> Jayesh
> >>
> >
>
>

Re: possible bug: scheduler dont schedule manual triggered dag on 1.8.b2

Posted by Bolke de Bruin <bd...@gmail.com>.
Further analysis shows that the change that created this issue was unnecessary. TaskInstances will be set to 'scheduled' if they meet the criteria to run, also the ones up for retry. No task_instance will be send to the executor in another state than 'scheduled'. Thus in an earlier patch I removed the 'upforretry' parameter from _execute_state_instances. This change is there to verify a DagRun is running, while this has already been covered, by contract, earlier.

I created AIRFLOW-803 and https://github.com/apache/incubator-airflow/pull/2019. I removed the index creation that was done as part of this.

Please review, I would like to have it part of Beta 3, the bug itself I consider to be a blocker for release.

Bolke


> On 25 Jan 2017, at 09:32, Bolke de Bruin <bd...@gmail.com> wrote:
> 
> Probably (havent tested it yet) this makes it work again:
> 
> TI = models.TaskInstance
> task_instances_to_examine = (
>     session
>     .query(TI)
>     .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
>     .filter(TI.state.in_(states))
>     .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
>                        TI.execution_date == DagRun.execution_date,
>                        DagRun.state == State.RUNNING,
>                        DagRun.run_id.notlike('backfill' + '%')))
>     .all()
> )
> 
> 
> Bolke
> 
> 
>> On 25 Jan 2017, at 09:00, Bolke de Bruin <bdbruin@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks for tracking this down Jayesh. I suspect not wanting ‘backfills’ is the reason for it (ie. backfills are not scheduled yet), but I need to look through the code. Maybe Dan or Paul can chime in?
>> 
>> Bolke
>> 
>>> On 25 Jan 2017, at 02:46, Jayesh Senjaliya <jhsonline@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I am running airflow 1.8.b2
>>> 
>>> a week before I had perfectly running dag which i put it on 1.8.b2 and I
>>> find that scheduler is not scheduling the task even though the dag run was
>>> created.
>>> 
>>> I debugged the issue and came down to this piece of code where scheduler
>>> finds task to schedule
>>> 
>>> TI = models.TaskInstance
>>> task_instances_to_examine = (
>>>   session
>>>   .query(TI)
>>>   .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
>>>   .filter(TI.state.in_(states))
>>>   .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
>>>                      TI.execution_date == DagRun.execution_date,
>>>                      DagRun.state == State.RUNNING,
>>>                      DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
>>>   .all()
>>> )
>>> 
>>> 
>>> the root cause i think is join in the above query was added recently with
>>> this PR  "[AIRFLOW-219][AIRFLOW-398] Cgroups + impersonation"
>>> which has condition that run_id should be like DagRun.ID_PREFIX which is
>>> "scheduled__" which wont be true for manually triggered dag ( which has
>>> run_id like "manual__2017-01-24T17:08:35.251019" )
>>> 
>>> so I am not sure why we need that condition there at all, does anyone know?
>>> 
>>> Please comment.
>>> 
>>> Thanks
>>> Jayesh
>> 
> 


Re: possible bug: scheduler dont schedule manual triggered dag on 1.8.b2

Posted by Bolke de Bruin <bd...@gmail.com>.
Probably (havent tested it yet) this makes it work again:

TI = models.TaskInstance
task_instances_to_examine = (
    session
    .query(TI)
    .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
    .filter(TI.state.in_(states))
    .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
                       TI.execution_date == DagRun.execution_date,
                       DagRun.state == State.RUNNING,
                       DagRun.run_id.notlike('backfill' + '%')))
    .all()
)


Bolke


> On 25 Jan 2017, at 09:00, Bolke de Bruin <bd...@gmail.com> wrote:
> 
> Thanks for tracking this down Jayesh. I suspect not wanting ‘backfills’ is the reason for it (ie. backfills are not scheduled yet), but I need to look through the code. Maybe Dan or Paul can chime in?
> 
> Bolke
> 
>> On 25 Jan 2017, at 02:46, Jayesh Senjaliya <jh...@gmail.com> wrote:
>> 
>> I am running airflow 1.8.b2
>> 
>> a week before I had perfectly running dag which i put it on 1.8.b2 and I
>> find that scheduler is not scheduling the task even though the dag run was
>> created.
>> 
>> I debugged the issue and came down to this piece of code where scheduler
>> finds task to schedule
>> 
>> TI = models.TaskInstance
>> task_instances_to_examine = (
>>   session
>>   .query(TI)
>>   .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
>>   .filter(TI.state.in_(states))
>>   .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
>>                      TI.execution_date == DagRun.execution_date,
>>                      DagRun.state == State.RUNNING,
>>                      DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
>>   .all()
>> )
>> 
>> 
>> the root cause i think is join in the above query was added recently with
>> this PR  "[AIRFLOW-219][AIRFLOW-398] Cgroups + impersonation"
>> which has condition that run_id should be like DagRun.ID_PREFIX which is
>> "scheduled__" which wont be true for manually triggered dag ( which has
>> run_id like "manual__2017-01-24T17:08:35.251019" )
>> 
>> so I am not sure why we need that condition there at all, does anyone know?
>> 
>> Please comment.
>> 
>> Thanks
>> Jayesh
> 


Re: possible bug: scheduler dont schedule manual triggered dag on 1.8.b2

Posted by Bolke de Bruin <bd...@gmail.com>.
Thanks for tracking this down Jayesh. I suspect not wanting ‘backfills’ is the reason for it (ie. backfills are not scheduled yet), but I need to look through the code. Maybe Dan or Paul can chime in?

Bolke

> On 25 Jan 2017, at 02:46, Jayesh Senjaliya <jh...@gmail.com> wrote:
> 
> I am running airflow 1.8.b2
> 
> a week before I had perfectly running dag which i put it on 1.8.b2 and I
> find that scheduler is not scheduling the task even though the dag run was
> created.
> 
> I debugged the issue and came down to this piece of code where scheduler
> finds task to schedule
> 
> TI = models.TaskInstance
> task_instances_to_examine = (
>    session
>    .query(TI)
>    .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
>    .filter(TI.state.in_(states))
>    .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
>                       TI.execution_date == DagRun.execution_date,
>                       DagRun.state == State.RUNNING,
>                       DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
>    .all()
> )
> 
> 
> the root cause i think is join in the above query was added recently with
> this PR  "[AIRFLOW-219][AIRFLOW-398] Cgroups + impersonation"
> which has condition that run_id should be like DagRun.ID_PREFIX which is
> "scheduled__" which wont be true for manually triggered dag ( which has
> run_id like "manual__2017-01-24T17:08:35.251019" )
> 
> so I am not sure why we need that condition there at all, does anyone know?
> 
> Please comment.
> 
> Thanks
> Jayesh