You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by David Klosowski <da...@thinknear.com> on 2016/08/03 01:49:35 UTC
DAG scheduled for start_date of today and an interval of 7 days keeps
getting scheduled for the past
I have a DAG that I just deployed that the scheduler keeps scheduling for
the last two months in the past.
start_date: 8/5/2016
scheduled runs started:
7/3/2016
6/5/2016
Here is the gist of this DAG's architecture:
The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors that
it builds which that represent 'daily' jobs and then has a DummyOperator
task which aggregates and triggers the 'weekly' job task upon completion.
Some of the code showcasing this:
run_for_date = datetime(2016, 8, 2)
args = {'owner': 'airflow',
'depends_on_past': False,
'start_date': run_for_date,
'email': [alert_email],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'trigger_rule' : 'all_success'}
dag = DAG(dag_id='weekly_no_track', default_args=args,
schedule_interval=timedelta(days=7),
max_active_runs=1)
downstream_task = dag.get_task('wait-for-dailies')
for weekday in [MO, TU, WE, TH, FR, SA, SU]:
task_id = 'wait-for-daily-{day}'.format(day=weekday)
# weekday(-1) subtracts 1 relative week from the given weekday, however
if the calculated date is already Monday,
# for example, -1 won't change the day.
delta = relativedelta(weekday=weekday(-1))
sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
external_dag_id='daily_no_track',
external_task_id='daily-no-track',
execution_delta=delta, timeout=86400) #
86400 = 24 hours
sensor.set_downstream(downstream_task)
I don't understand what is going on. Why is the scheduler doing this? I
want the DAG to start considering dates from today and on in UTC.
Cheers,
David
Re: DAG scheduled for start_date of today and an interval of 7 days
keeps getting scheduled for the past
Posted by David Klosowski <da...@thinknear.com>.
Hi Siddharth,
I created the ticket:
https://issues.apache.org/jira/browse/AIRFLOW-392
Let me know what I can do to help.
Cheers,
David
On Tue, Aug 2, 2016 at 8:10 PM, siddharth anand <sa...@apache.org> wrote:
> Interesting. If you haven't already, can you create a Jira and append an
> example dag that I can run to reproduce (likely capturing the code you have
> above). You can then assign the bug to me to look into.
>
> Also, please provide enough context on your use-case and why you are
> structuring your code this way. It will help identify any alternatives.
> It's not clear to me what you want to do exactly.
> -s
>
> On Tue, Aug 2, 2016 at 8:02 PM, David Klosowski <da...@thinknear.com>
> wrote:
>
> > start_date being updated isn't the issue here. I haven't changed it.
> New
> > execution_dates keep getting created for the past before any dags or
> > start_dates existed.
> >
> > Cheers,
> > David
> >
> > On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org>
> wrote:
> >
> > > The problem might be that the start_date does not get updated. I work
> > > around this by changing the name of my dag. I do lose history as well,
> > but
> > > it works.
> > >
> > > My dags are named "some_dag_v1". When I change a start date, I update
> the
> > > version suffix to force a reload : "some_dag_v2"
> > >
> > > -s
> > >
> > > On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> > > wrote:
> > >
> > > > I have a DAG that I just deployed that the scheduler keeps scheduling
> > for
> > > > the last two months in the past.
> > > >
> > > > start_date: 8/5/2016
> > > >
> > > > scheduled runs started:
> > > > 7/3/2016
> > > > 6/5/2016
> > > >
> > > > Here is the gist of this DAG's architecture:
> > > >
> > > > The DAG depends another dags tasks using 7 dynamic
> ExternalTaskSensors
> > > that
> > > > it builds which that represent 'daily' jobs and then has a
> > DummyOperator
> > > > task which aggregates and triggers the 'weekly' job task upon
> > completion.
> > > >
> > > > Some of the code showcasing this:
> > > >
> > > > run_for_date = datetime(2016, 8, 2)
> > > >
> > > > args = {'owner': 'airflow',
> > > > 'depends_on_past': False,
> > > > 'start_date': run_for_date,
> > > > 'email': [alert_email],
> > > > 'email_on_failure': True,
> > > > 'email_on_retry': False,
> > > > 'retries': 1,
> > > > 'trigger_rule' : 'all_success'}
> > > >
> > > > dag = DAG(dag_id='weekly_no_track', default_args=args,
> > > > schedule_interval=timedelta(days=7),
> > > > max_active_runs=1)
> > > >
> > > >
> > > > downstream_task = dag.get_task('wait-for-dailies')
> > > > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> > > > task_id = 'wait-for-daily-{day}'.format(day=weekday)
> > > >
> > > > # weekday(-1) subtracts 1 relative week from the given weekday,
> > > however
> > > > if the calculated date is already Monday,
> > > > # for example, -1 won't change the day.
> > > > delta = relativedelta(weekday=weekday(-1))
> > > >
> > > > sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> > > > external_dag_id='daily_no_track',
> > > > external_task_id='daily-no-track',
> > > > execution_delta=delta, timeout=86400)
> > #
> > > > 86400 = 24 hours
> > > > sensor.set_downstream(downstream_task)
> > > >
> > > >
> > > > I don't understand what is going on. Why is the scheduler doing
> > this? I
> > > > want the DAG to start considering dates from today and on in UTC.
> > > >
> > > > Cheers,
> > > > David
> > > >
> > >
> >
>
Re: DAG scheduled for start_date of today and an interval of 7 days
keeps getting scheduled for the past
Posted by siddharth anand <sa...@apache.org>.
Interesting. If you haven't already, can you create a Jira and append an
example dag that I can run to reproduce (likely capturing the code you have
above). You can then assign the bug to me to look into.
Also, please provide enough context on your use-case and why you are
structuring your code this way. It will help identify any alternatives.
It's not clear to me what you want to do exactly.
-s
On Tue, Aug 2, 2016 at 8:02 PM, David Klosowski <da...@thinknear.com>
wrote:
> start_date being updated isn't the issue here. I haven't changed it. New
> execution_dates keep getting created for the past before any dags or
> start_dates existed.
>
> Cheers,
> David
>
> On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org> wrote:
>
> > The problem might be that the start_date does not get updated. I work
> > around this by changing the name of my dag. I do lose history as well,
> but
> > it works.
> >
> > My dags are named "some_dag_v1". When I change a start date, I update the
> > version suffix to force a reload : "some_dag_v2"
> >
> > -s
> >
> > On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> > wrote:
> >
> > > I have a DAG that I just deployed that the scheduler keeps scheduling
> for
> > > the last two months in the past.
> > >
> > > start_date: 8/5/2016
> > >
> > > scheduled runs started:
> > > 7/3/2016
> > > 6/5/2016
> > >
> > > Here is the gist of this DAG's architecture:
> > >
> > > The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors
> > that
> > > it builds which that represent 'daily' jobs and then has a
> DummyOperator
> > > task which aggregates and triggers the 'weekly' job task upon
> completion.
> > >
> > > Some of the code showcasing this:
> > >
> > > run_for_date = datetime(2016, 8, 2)
> > >
> > > args = {'owner': 'airflow',
> > > 'depends_on_past': False,
> > > 'start_date': run_for_date,
> > > 'email': [alert_email],
> > > 'email_on_failure': True,
> > > 'email_on_retry': False,
> > > 'retries': 1,
> > > 'trigger_rule' : 'all_success'}
> > >
> > > dag = DAG(dag_id='weekly_no_track', default_args=args,
> > > schedule_interval=timedelta(days=7),
> > > max_active_runs=1)
> > >
> > >
> > > downstream_task = dag.get_task('wait-for-dailies')
> > > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> > > task_id = 'wait-for-daily-{day}'.format(day=weekday)
> > >
> > > # weekday(-1) subtracts 1 relative week from the given weekday,
> > however
> > > if the calculated date is already Monday,
> > > # for example, -1 won't change the day.
> > > delta = relativedelta(weekday=weekday(-1))
> > >
> > > sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> > > external_dag_id='daily_no_track',
> > > external_task_id='daily-no-track',
> > > execution_delta=delta, timeout=86400)
> #
> > > 86400 = 24 hours
> > > sensor.set_downstream(downstream_task)
> > >
> > >
> > > I don't understand what is going on. Why is the scheduler doing
> this? I
> > > want the DAG to start considering dates from today and on in UTC.
> > >
> > > Cheers,
> > > David
> > >
> >
>
Re: DAG scheduled for start_date of today and an interval of 7 days
keeps getting scheduled for the past
Posted by David Klosowski <da...@thinknear.com>.
start_date being updated isn't the issue here. I haven't changed it. New
execution_dates keep getting created for the past before any dags or
start_dates existed.
Cheers,
David
On Tue, Aug 2, 2016 at 7:10 PM, siddharth anand <sa...@apache.org> wrote:
> The problem might be that the start_date does not get updated. I work
> around this by changing the name of my dag. I do lose history as well, but
> it works.
>
> My dags are named "some_dag_v1". When I change a start date, I update the
> version suffix to force a reload : "some_dag_v2"
>
> -s
>
> On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
> wrote:
>
> > I have a DAG that I just deployed that the scheduler keeps scheduling for
> > the last two months in the past.
> >
> > start_date: 8/5/2016
> >
> > scheduled runs started:
> > 7/3/2016
> > 6/5/2016
> >
> > Here is the gist of this DAG's architecture:
> >
> > The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors
> that
> > it builds which that represent 'daily' jobs and then has a DummyOperator
> > task which aggregates and triggers the 'weekly' job task upon completion.
> >
> > Some of the code showcasing this:
> >
> > run_for_date = datetime(2016, 8, 2)
> >
> > args = {'owner': 'airflow',
> > 'depends_on_past': False,
> > 'start_date': run_for_date,
> > 'email': [alert_email],
> > 'email_on_failure': True,
> > 'email_on_retry': False,
> > 'retries': 1,
> > 'trigger_rule' : 'all_success'}
> >
> > dag = DAG(dag_id='weekly_no_track', default_args=args,
> > schedule_interval=timedelta(days=7),
> > max_active_runs=1)
> >
> >
> > downstream_task = dag.get_task('wait-for-dailies')
> > for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> > task_id = 'wait-for-daily-{day}'.format(day=weekday)
> >
> > # weekday(-1) subtracts 1 relative week from the given weekday,
> however
> > if the calculated date is already Monday,
> > # for example, -1 won't change the day.
> > delta = relativedelta(weekday=weekday(-1))
> >
> > sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> > external_dag_id='daily_no_track',
> > external_task_id='daily-no-track',
> > execution_delta=delta, timeout=86400) #
> > 86400 = 24 hours
> > sensor.set_downstream(downstream_task)
> >
> >
> > I don't understand what is going on. Why is the scheduler doing this? I
> > want the DAG to start considering dates from today and on in UTC.
> >
> > Cheers,
> > David
> >
>
Re: DAG scheduled for start_date of today and an interval of 7 days
keeps getting scheduled for the past
Posted by siddharth anand <sa...@apache.org>.
The problem might be that the start_date does not get updated. I work
around this by changing the name of my dag. I do lose history as well, but
it works.
My dags are named "some_dag_v1". When I change a start date, I update the
version suffix to force a reload : "some_dag_v2"
-s
On Tue, Aug 2, 2016 at 6:49 PM, David Klosowski <da...@thinknear.com>
wrote:
> I have a DAG that I just deployed that the scheduler keeps scheduling for
> the last two months in the past.
>
> start_date: 8/5/2016
>
> scheduled runs started:
> 7/3/2016
> 6/5/2016
>
> Here is the gist of this DAG's architecture:
>
> The DAG depends another dags tasks using 7 dynamic ExternalTaskSensors that
> it builds which that represent 'daily' jobs and then has a DummyOperator
> task which aggregates and triggers the 'weekly' job task upon completion.
>
> Some of the code showcasing this:
>
> run_for_date = datetime(2016, 8, 2)
>
> args = {'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': run_for_date,
> 'email': [alert_email],
> 'email_on_failure': True,
> 'email_on_retry': False,
> 'retries': 1,
> 'trigger_rule' : 'all_success'}
>
> dag = DAG(dag_id='weekly_no_track', default_args=args,
> schedule_interval=timedelta(days=7),
> max_active_runs=1)
>
>
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> task_id = 'wait-for-daily-{day}'.format(day=weekday)
>
> # weekday(-1) subtracts 1 relative week from the given weekday, however
> if the calculated date is already Monday,
> # for example, -1 won't change the day.
> delta = relativedelta(weekday=weekday(-1))
>
> sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> external_dag_id='daily_no_track',
> external_task_id='daily-no-track',
> execution_delta=delta, timeout=86400) #
> 86400 = 24 hours
> sensor.set_downstream(downstream_task)
>
>
> I don't understand what is going on. Why is the scheduler doing this? I
> want the DAG to start considering dates from today and on in UTC.
>
> Cheers,
> David
>