You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by David Klosowski <da...@thinknear.com> on 2016/08/03 01:49:35 UTC

DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past

I have a DAG that I just deployed that the scheduler keeps scheduling for
the last two months in the past.

start_date: 8/5/2016

scheduled runs started:
7/3/2016
6/5/2016

Here is the gist of this DAG's architecture:

The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors that
it builds which that represent 'daily' jobs and then has a DummyOperator
task which aggregates and triggers the 'weekly' job task upon completion.

Some of the code showcasing this:

run_for_date = datetime(2016, 8, 2)

args = {'owner': 'airflow',
        'depends_on_past': False,
        'start_date': run_for_date,
        'email': [alert_email],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'trigger_rule' : 'all_success'}

dag = DAG(dag_id='weekly_no_track', default_args=args,
          schedule_interval=timedelta(days=7),
          max_active_runs=1)


downstream_task = dag.get_task('wait-for-dailies')
for weekday in [MO, TU, WE, TH, FR, SA, SU]:
    task_id = 'wait-for-daily-{day}'.format(day=weekday)

    # weekday(-1) subtracts 1 relative week from the given weekday, however
if the calculated date is already Monday,
    # for example, -1 won't change the day.
    delta = relativedelta(weekday=weekday(-1))

    sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
                                external_dag_id='daily_no_track',
external_task_id='daily-no-track',
                                execution_delta=delta, timeout=86400)  #
86400 = 24 hours
    sensor.set_downstream(downstream_task)


I don't understand what is going on.  Why is the scheduler doing this?  I
want the DAG to start considering dates from today and on in UTC.

Cheers,
David

Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past

Posted by David Klosowski <da...@thinknear.com>.
Hi Siddharth,

I created the ticket:

https://issues.apache.org/jira/browse/AIRFLOW-392

Let me know what I can do to help.

Cheers,
David

On Tue, Aug 2, 2016 at 8:10 PM, siddharth anand <sa...@apache.org> wrote:

> Interesting. If you haven't already, can you create a Jira and append an
> example dag that I can run to reproduce (likely capturing the code you have
> above). You can then assign the bug to me to look into.
>
> Also, please provide enough context on your use-case and why you are
> structuring your code this way. It will help identify any alternatives.
> It's not clear to me what you want to do exactly.
> -s
>
> On Tue, Aug 2, 2016 at 8:02 PM, David Klosowski <da...@thinknear.com>
> wrote:
>
> > start_date being updated isn't the issue here.  I haven't changed it.
>  New
> > execution_dates keep getting created for the past before any dags or
> > start_dates existed.
> >
> > Cheers,
> > David
> >
> > On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org>
> wrote:
> >
> > > The problem might be that the start_date does not get updated. I work
> > > around this by changing the name of my dag. I do lose history as well,
> > but
> > > it works.
> > >
> > > My dags are named "some_dag_v1". When I change a start date, I update
> the
> > > version suffix to force a reload : "some_dag_v2"
> > >
> > > -s
> > >
> > > On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> > > wrote:
> > >
> > > > I have a DAG that I just deployed that the scheduler keeps scheduling
> > for
> > > > the last two months in the past.
> > > >
> > > > start_date: 8/5/2016
> > > >
> > > > scheduled runs started:
> > > > 7/3/2016
> > > > 6/5/2016
> > > >
> > > > Here is the gist of this DAG's architecture:
> > > >
> > > > The DAG depends another dags tasks using 7 dynamic
> ExternalTaskSensors
> > > that
> > > > it builds which that represent 'daily' jobs and then has a
> > DummyOperator
> > > > task which aggregates and triggers the 'weekly' job task upon
> > completion.
> > > >
> > > > Some of the code showcasing this:
> > > >
> > > > run_for_date = datetime(2016, 8, 2)
> > > >
> > > > args = {'owner': 'airflow',
> > > >         'depends_on_past': False,
> > > >         'start_date': run_for_date,
> > > >         'email': [alert_email],
> > > >         'email_on_failure': True,
> > > >         'email_on_retry': False,
> > > >         'retries': 1,
> > > >         'trigger_rule' : 'all_success'}
> > > >
> > > > dag = DAG(dag_id='weekly_no_track', default_args=args,
> > > >           schedule_interval=timedelta(days=7),
> > > >           max_active_runs=1)
> > > >
> > > >
> > > > downstream_task = dag.get_task('wait-for-dailies')
> > > > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> > > >     task_id = 'wait-for-daily-{day}'.format(day=weekday)
> > > >
> > > >     # weekday(-1) subtracts 1 relative week from the given weekday,
> > > however
> > > > if the calculated date is already Monday,
> > > >     # for example, -1 won't change the day.
> > > >     delta = relativedelta(weekday=weekday(-1))
> > > >
> > > >     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> > > >                                 external_dag_id='daily_no_track',
> > > > external_task_id='daily-no-track',
> > > >                                 execution_delta=delta, timeout=86400)
> > #
> > > > 86400 = 24 hours
> > > >     sensor.set_downstream(downstream_task)
> > > >
> > > >
> > > > I don't understand what is going on.  Why is the scheduler doing
> > this?  I
> > > > want the DAG to start considering dates from today and on in UTC.
> > > >
> > > > Cheers,
> > > > David
> > > >
> > >
> >
>

Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past

Posted by siddharth anand <sa...@apache.org>.
Interesting. If you haven't already, can you create a Jira and append an
example dag that I can run to reproduce (likely capturing the code you have
above). You can then assign the bug to me to look into.

Also, please provide enough context on your use-case and why you are
structuring your code this way. It will help identify any alternatives.
It's not clear to me what you want to do exactly.
-s

On Tue, Aug 2, 2016 at 8:02 PM, David Klosowski <da...@thinknear.com>
wrote:

> start_date being updated isn't the issue here.  I haven't changed it.   New
> execution_dates keep getting created for the past before any dags or
> start_dates existed.
>
> Cheers,
> David
>
> On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org> wrote:
>
> > The problem might be that the start_date does not get updated. I work
> > around this by changing the name of my dag. I do lose history as well,
> but
> > it works.
> >
> > My dags are named "some_dag_v1". When I change a start date, I update the
> > version suffix to force a reload : "some_dag_v2"
> >
> > -s
> >
> > On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> > wrote:
> >
> > > I have a DAG that I just deployed that the scheduler keeps scheduling
> for
> > > the last two months in the past.
> > >
> > > start_date: 8/5/2016
> > >
> > > scheduled runs started:
> > > 7/3/2016
> > > 6/5/2016
> > >
> > > Here is the gist of this DAG's architecture:
> > >
> > > The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors
> > that
> > > it builds which that represent 'daily' jobs and then has a
> DummyOperator
> > > task which aggregates and triggers the 'weekly' job task upon
> completion.
> > >
> > > Some of the code showcasing this:
> > >
> > > run_for_date = datetime(2016, 8, 2)
> > >
> > > args = {'owner': 'airflow',
> > >         'depends_on_past': False,
> > >         'start_date': run_for_date,
> > >         'email': [alert_email],
> > >         'email_on_failure': True,
> > >         'email_on_retry': False,
> > >         'retries': 1,
> > >         'trigger_rule' : 'all_success'}
> > >
> > > dag = DAG(dag_id='weekly_no_track', default_args=args,
> > >           schedule_interval=timedelta(days=7),
> > >           max_active_runs=1)
> > >
> > >
> > > downstream_task = dag.get_task('wait-for-dailies')
> > > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> > >     task_id = 'wait-for-daily-{day}'.format(day=weekday)
> > >
> > >     # weekday(-1) subtracts 1 relative week from the given weekday,
> > however
> > > if the calculated date is already Monday,
> > >     # for example, -1 won't change the day.
> > >     delta = relativedelta(weekday=weekday(-1))
> > >
> > >     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> > >                                 external_dag_id='daily_no_track',
> > > external_task_id='daily-no-track',
> > >                                 execution_delta=delta, timeout=86400)
> #
> > > 86400 = 24 hours
> > >     sensor.set_downstream(downstream_task)
> > >
> > >
> > > I don't understand what is going on.  Why is the scheduler doing
> this?  I
> > > want the DAG to start considering dates from today and on in UTC.
> > >
> > > Cheers,
> > > David
> > >
> >
>

Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past

Posted by David Klosowski <da...@thinknear.com>.
start_date being updated isn't the issue here.  I haven't changed it.   New
execution_dates keep getting created for the past before any dags or
start_dates existed.

Cheers,
David

On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org> wrote:

> The problem might be that the start_date does not get updated. I work
> around this by changing the name of my dag. I do lose history as well, but
> it works.
>
> My dags are named "some_dag_v1". When I change a start date, I update the
> version suffix to force a reload : "some_dag_v2"
>
> -s
>
> On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> wrote:
>
> > I have a DAG that I just deployed that the scheduler keeps scheduling for
> > the last two months in the past.
> >
> > start_date: 8/5/2016
> >
> > scheduled runs started:
> > 7/3/2016
> > 6/5/2016
> >
> > Here is the gist of this DAG's architecture:
> >
> > The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors
> that
> > it builds which that represent 'daily' jobs and then has a DummyOperator
> > task which aggregates and triggers the 'weekly' job task upon completion.
> >
> > Some of the code showcasing this:
> >
> > run_for_date = datetime(2016, 8, 2)
> >
> > args = {'owner': 'airflow',
> >         'depends_on_past': False,
> >         'start_date': run_for_date,
> >         'email': [alert_email],
> >         'email_on_failure': True,
> >         'email_on_retry': False,
> >         'retries': 1,
> >         'trigger_rule' : 'all_success'}
> >
> > dag = DAG(dag_id='weekly_no_track', default_args=args,
> >           schedule_interval=timedelta(days=7),
> >           max_active_runs=1)
> >
> >
> > downstream_task = dag.get_task('wait-for-dailies')
> > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> >     task_id = 'wait-for-daily-{day}'.format(day=weekday)
> >
> >     # weekday(-1) subtracts 1 relative week from the given weekday,
> however
> > if the calculated date is already Monday,
> >     # for example, -1 won't change the day.
> >     delta = relativedelta(weekday=weekday(-1))
> >
> >     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> >                                 external_dag_id='daily_no_track',
> > external_task_id='daily-no-track',
> >                                 execution_delta=delta, timeout=86400)  #
> > 86400 = 24 hours
> >     sensor.set_downstream(downstream_task)
> >
> >
> > I don't understand what is going on.  Why is the scheduler doing this?  I
> > want the DAG to start considering dates from today and on in UTC.
> >
> > Cheers,
> > David
> >
>

Re: DAG scheduled for start_date of today and an interval of 7 days keeps getting scheduled for the past

Posted by siddharth anand <sa...@apache.org>.
The problem might be that the start_date does not get updated. I work
around this by changing the name of my dag. I do lose history as well, but
it works.

My dags are named "some_dag_v1". When I change a start date, I update the
version suffix to force a reload : "some_dag_v2"

-s

On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
wrote:

> I have a DAG that I just deployed that the scheduler keeps scheduling for
> the last two months in the past.
>
> start_date: 8/5/2016
>
> scheduled runs started:
> 7/3/2016
> 6/5/2016
>
> Here is the gist of this DAG's architecture:
>
> The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors that
> it builds which that represent 'daily' jobs and then has a DummyOperator
> task which aggregates and triggers the 'weekly' job task upon completion.
>
> Some of the code showcasing this:
>
> run_for_date = datetime(2016, 8, 2)
>
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success'}
>
> dag = DAG(dag_id='weekly_no_track', default_args=args,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
>
>
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>
>     # weekday(-1) subtracts 1 relative week from the given weekday, however
> if the calculated date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track',
> external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  #
> 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
>
>
> I don't understand what is going on.  Why is the scheduler doing this?  I
> want the DAG to start considering dates from today and on in UTC.
>
> Cheers,
> David
>