You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by "jspsai@gmail.com" <js...@gmail.com> on 2017/10/02 18:51:44 UTC

TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Hi experts,

I am running the TriggerDagRunOperator example that comes with Airflow.

example_trigger_controller_dag
example_trigger_target_dag

I've created 2 new dags when are based on the above controller and trigger dags.

when I run the example_trigger_controller_dag, i see that the examle_trigger_target_dag is scheduled to run after a few hours later, how to schedule it to run immediately?
here is the logs where the Sub task that is scheduled to run at 17:53:31 and the Base task has ran at 13:53:31.

[2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask: [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating DagRun <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31: trig__2017-10-02T13:53:31.248671, externally triggered: True>

I might me doing something really stupid here.
please help.

Thanks
-Sai.








Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by "jspsai@gmail.com" <js...@gmail.com>.

On 2017-10-03 11:20, Justin Palmer <ju...@github.com> wrote: 
> Hi Sai,
> 
> That operator would be a plugin
> <https://airflow.incubator.apache.org/plugins.html#plugins>, so you'll need
> to import it as such using plugins.operators.
> 
> from plugins.operators.execute_dag_run_operator import ExecuteDagRunOperator
> 
> 
> -Justin
> 
> On Tue, Oct 3, 2017 at 8:13 AM jspsai@gmail.com <js...@gmail.com> wrote:
> 
> >
> >
> > On 2017-10-02 18:38, Justin Palmer <ju...@github.com> wrote:
> > > Hi Sai,
> > >
> > > At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
> > > to scheduling tasks in the future, we wanted a stable execution date that
> > > was based on the triggering DAG.  We created a plugin that is basically a
> > > copy of TriggerDagRunOperator, except it passes in the `execution_date`
> > to
> > > the triggered DAG.   I've linked the plugin below.  Feel free to use it
> > > under the MIT license.
> > >
> > > https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e
> > >
> > > -Justin
> > >
> > > On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com>
> > wrote:
> > >
> > > > Hi experts,
> > > >
> > > > I am running the TriggerDagRunOperator example that comes with Airflow.
> > > >
> > > > example_trigger_controller_dag
> > > > example_trigger_target_dag
> > > >
> > > > I've created 2 new dags when are based on the above controller and
> > trigger
> > > > dags.
> > > >
> > > > when I run the example_trigger_controller_dag, i see that the
> > > > examle_trigger_target_dag is scheduled to run after a few hours later,
> > how
> > > > to schedule it to run immediately?
> > > > here is the logs where the Sub task that is scheduled to run at
> > 17:53:31
> > > > and the Base task has ran at 13:53:31.
> > > >
> > > > [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> > > > [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating
> > DagRun
> > > > <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> > > > trig__2017-10-02T13:53:31.248671, externally triggered: True>
> > > >
> > > > I might me doing something really stupid here.
> > > > please help.
> > > >
> > > > Thanks
> > > > -Sai.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > Justin,
> >
> > When i try to compile i am getting an import error for the plugin
> > This is how i am importing.
> > from airflow.operators.execute_dag_run_operator import
> > ExecuteDagRunOperator
> >
> > Below is the error when compiled.
> > ImportError: No module named execute_dag_run_operator
> > [2017-10-03 11:09:36,242] [22209] {models.py:266} ERROR - Failed to
> > import: /home/ec2-user/airflow/dags/Test_Controller_Job.py
> > Traceback (most recent call last):
> >   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 263, in
> > process_file
> >     m = imp.load_source(mod_name, filepath)
> >   File "/home/ec2-user/airflow/dags/Test_Controller_Job.py", line 43, in
> > <module>
> >     from airflow.operators.execute_dag_run_operator import
> > ExecuteDagRunOperator
> >
> > I am on Airflow 1.8,
> >
> > Do you have an example implementation for this?
> >
> > Thanks
> > Sai
>Thanks Justin!, i'll give it a try

Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by "jspsai@gmail.com" <js...@gmail.com>.

On 2017-10-03 11:20, Justin Palmer <ju...@github.com> wrote: 
> Hi Sai,
> 
> That operator would be a plugin
> <https://airflow.incubator.apache.org/plugins.html#plugins>, so you'll need
> to import it as such using plugins.operators.
> 
> from plugins.operators.execute_dag_run_operator import ExecuteDagRunOperator
> 
> 
> -Justin
> 
> On Tue, Oct 3, 2017 at 8:13 AM jspsai@gmail.com <js...@gmail.com> wrote:
> 
> >
> >
> > On 2017-10-02 18:38, Justin Palmer <ju...@github.com> wrote:
> > > Hi Sai,
> > >
> > > At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
> > > to scheduling tasks in the future, we wanted a stable execution date that
> > > was based on the triggering DAG.  We created a plugin that is basically a
> > > copy of TriggerDagRunOperator, except it passes in the `execution_date`
> > to
> > > the triggered DAG.   I've linked the plugin below.  Feel free to use it
> > > under the MIT license.
> > >
> > > https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e
> > >
> > > -Justin
> > >
> > > On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com>
> > wrote:
> > >
> > > > Hi experts,
> > > >
> > > > I am running the TriggerDagRunOperator example that comes with Airflow.
> > > >
> > > > example_trigger_controller_dag
> > > > example_trigger_target_dag
> > > >
> > > > I've created 2 new dags when are based on the above controller and
> > trigger
> > > > dags.
> > > >
> > > > when I run the example_trigger_controller_dag, i see that the
> > > > examle_trigger_target_dag is scheduled to run after a few hours later,
> > how
> > > > to schedule it to run immediately?
> > > > here is the logs where the Sub task that is scheduled to run at
> > 17:53:31
> > > > and the Base task has ran at 13:53:31.
> > > >
> > > > [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> > > > [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating
> > DagRun
> > > > <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> > > > trig__2017-10-02T13:53:31.248671, externally triggered: True>
> > > >
> > > > I might me doing something really stupid here.
> > > > please help.
> > > >
> > > > Thanks
> > > > -Sai.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > Justin,
> >
> > When i try to compile i am getting an import error for the plugin
> > This is how i am importing.
> > from airflow.operators.execute_dag_run_operator import
> > ExecuteDagRunOperator
> >
> > Below is the error when compiled.
> > ImportError: No module named execute_dag_run_operator
> > [2017-10-03 11:09:36,242] [22209] {models.py:266} ERROR - Failed to
> > import: /home/ec2-user/airflow/dags/Test_Controller_Job.py
> > Traceback (most recent call last):
> >   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 263, in
> > process_file
> >     m = imp.load_source(mod_name, filepath)
> >   File "/home/ec2-user/airflow/dags/Test_Controller_Job.py", line 43, in
> > <module>
> >     from airflow.operators.execute_dag_run_operator import
> > ExecuteDagRunOperator
> >
> > I am on Airflow 1.8,
> >
> > Do you have an example implementation for this?
> >
> > Thanks
> > Sai
> >
> Hello Justin,

for some reason i am getting the import error, even after copying the file to plugins directory and importing it in another file, I am getting this error, do i need to do anything else.
I am actually from Java and new to python, so currently have some difficulty understanding python''s conventions?

Traceback (most recent call last):

  File "test_trigger.py", line 3, in <module>
    from plugins.operators.execute_dag_run_operator import ExecuteDagRunOperator
ImportError: No module named plugins.operators.execute_dag_run_operator

I appreciate any help.

Thanks
-Sai

Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by Justin Palmer <ju...@github.com>.
Hi Sai,

That operator would be a plugin
<https://airflow.incubator.apache.org/plugins.html#plugins>, so you'll need
to import it as such using plugins.operators.

from plugins.operators.execute_dag_run_operator import ExecuteDagRunOperator


-Justin

On Tue, Oct 3, 2017 at 8:13 AM jspsai@gmail.com <js...@gmail.com> wrote:

>
>
> On 2017-10-02 18:38, Justin Palmer <ju...@github.com> wrote:
> > Hi Sai,
> >
> > At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
> > to scheduling tasks in the future, we wanted a stable execution date that
> > was based on the triggering DAG.  We created a plugin that is basically a
> > copy of TriggerDagRunOperator, except it passes in the `execution_date`
> to
> > the triggered DAG.   I've linked the plugin below.  Feel free to use it
> > under the MIT license.
> >
> > https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e
> >
> > -Justin
> >
> > On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com>
> wrote:
> >
> > > Hi experts,
> > >
> > > I am running the TriggerDagRunOperator example that comes with Airflow.
> > >
> > > example_trigger_controller_dag
> > > example_trigger_target_dag
> > >
> > > I've created 2 new dags when are based on the above controller and
> trigger
> > > dags.
> > >
> > > when I run the example_trigger_controller_dag, i see that the
> > > examle_trigger_target_dag is scheduled to run after a few hours later,
> how
> > > to schedule it to run immediately?
> > > here is the logs where the Sub task that is scheduled to run at
> 17:53:31
> > > and the Base task has ran at 13:53:31.
> > >
> > > [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> > > [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating
> DagRun
> > > <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> > > trig__2017-10-02T13:53:31.248671, externally triggered: True>
> > >
> > > I might me doing something really stupid here.
> > > please help.
> > >
> > > Thanks
> > > -Sai.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > Justin,
>
> When i try to compile i am getting an import error for the plugin
> This is how i am importing.
> from airflow.operators.execute_dag_run_operator import
> ExecuteDagRunOperator
>
> Below is the error when compiled.
> ImportError: No module named execute_dag_run_operator
> [2017-10-03 11:09:36,242] [22209] {models.py:266} ERROR - Failed to
> import: /home/ec2-user/airflow/dags/Test_Controller_Job.py
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 263, in
> process_file
>     m = imp.load_source(mod_name, filepath)
>   File "/home/ec2-user/airflow/dags/Test_Controller_Job.py", line 43, in
> <module>
>     from airflow.operators.execute_dag_run_operator import
> ExecuteDagRunOperator
>
> I am on Airflow 1.8,
>
> Do you have an example implementation for this?
>
> Thanks
> Sai
>

Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by "jspsai@gmail.com" <js...@gmail.com>.

On 2017-10-02 18:38, Justin Palmer <ju...@github.com> wrote: 
> Hi Sai,
> 
> At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
> to scheduling tasks in the future, we wanted a stable execution date that
> was based on the triggering DAG.  We created a plugin that is basically a
> copy of TriggerDagRunOperator, except it passes in the `execution_date` to
> the triggered DAG.   I've linked the plugin below.  Feel free to use it
> under the MIT license.
> 
> https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e
> 
> -Justin
> 
> On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com> wrote:
> 
> > Hi experts,
> >
> > I am running the TriggerDagRunOperator example that comes with Airflow.
> >
> > example_trigger_controller_dag
> > example_trigger_target_dag
> >
> > I've created 2 new dags when are based on the above controller and trigger
> > dags.
> >
> > when I run the example_trigger_controller_dag, i see that the
> > examle_trigger_target_dag is scheduled to run after a few hours later, how
> > to schedule it to run immediately?
> > here is the logs where the Sub task that is scheduled to run at 17:53:31
> > and the Base task has ran at 13:53:31.
> >
> > [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> > [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating DagRun
> > <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> > trig__2017-10-02T13:53:31.248671, externally triggered: True>
> >
> > I might me doing something really stupid here.
> > please help.
> >
> > Thanks
> > -Sai.
> >
> >
> >
> >
> >
> >
> >
> >
> Justin,

When i try to compile i am getting an import error for the plugin
This is how i am importing.
from airflow.operators.execute_dag_run_operator import ExecuteDagRunOperator

Below is the error when compiled.
ImportError: No module named execute_dag_run_operator
[2017-10-03 11:09:36,242] [22209] {models.py:266} ERROR - Failed to import: /home/ec2-user/airflow/dags/Test_Controller_Job.py
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/home/ec2-user/airflow/dags/Test_Controller_Job.py", line 43, in <module>
    from airflow.operators.execute_dag_run_operator import ExecuteDagRunOperator

I am on Airflow 1.8,

Do you have an example implementation for this?

Thanks
Sai

Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by "jspsai@gmail.com" <js...@gmail.com>.

On 2017-10-02 18:38, Justin Palmer <ju...@github.com> wrote: 
> Hi Sai,
> 
> At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
> to scheduling tasks in the future, we wanted a stable execution date that
> was based on the triggering DAG.  We created a plugin that is basically a
> copy of TriggerDagRunOperator, except it passes in the `execution_date` to
> the triggered DAG.   I've linked the plugin below.  Feel free to use it
> under the MIT license.
> 
> https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e
> 
> -Justin
> 
> On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com> wrote:
> 
> > Hi experts,
> >
> > I am running the TriggerDagRunOperator example that comes with Airflow.
> >
> > example_trigger_controller_dag
> > example_trigger_target_dag
> >
> > I've created 2 new dags when are based on the above controller and trigger
> > dags.
> >
> > when I run the example_trigger_controller_dag, i see that the
> > examle_trigger_target_dag is scheduled to run after a few hours later, how
> > to schedule it to run immediately?
> > here is the logs where the Sub task that is scheduled to run at 17:53:31
> > and the Base task has ran at 13:53:31.
> >
> > [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> > [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating DagRun
> > <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> > trig__2017-10-02T13:53:31.248671, externally triggered: True>
> >
> > I might me doing something really stupid here.
> > please help.
> >
> > Thanks
> > -Sai.
> >
> >
> >
> >
> >
> >
> >
> >
> Thanks Justin!!, I'll give it a try

Re: TriggerDagRunOperator sub tasks are scheduled to run after few hours later

Posted by Justin Palmer <ju...@github.com>.
Hi Sai,

At GitHub we had a similar issue with TriggerDagRunOperator.  In addition
to scheduling tasks in the future, we wanted a stable execution date that
was based on the triggering DAG.  We created a plugin that is basically a
copy of TriggerDagRunOperator, except it passes in the `execution_date` to
the triggered DAG.   I've linked the plugin below.  Feel free to use it
under the MIT license.

https://gist.github.com/Caged/f356430518247d1bbc2439a153e3c79e

-Justin

On Mon, Oct 2, 2017 at 2:02 PM jspsai@gmail.com <js...@gmail.com> wrote:

> Hi experts,
>
> I am running the TriggerDagRunOperator example that comes with Airflow.
>
> example_trigger_controller_dag
> example_trigger_target_dag
>
> I've created 2 new dags when are based on the above controller and trigger
> dags.
>
> when I run the example_trigger_controller_dag, i see that the
> examle_trigger_target_dag is scheduled to run after a few hours later, how
> to schedule it to run immediately?
> here is the logs where the Sub task that is scheduled to run at 17:53:31
> and the Base task has ran at 13:53:31.
>
> [2017-10-02 13:53:31,624] {base_task_runner.py:95} INFO - Subtask:
> [2017-10-02 13:53:31,623] {dagrun_operator.py:74} INFO - Creating DagRun
> <DagRun Trigger_Target_Dag @ 2017-10-02 17:53:31:
> trig__2017-10-02T13:53:31.248671, externally triggered: True>
>
> I might me doing something really stupid here.
> please help.
>
> Thanks
> -Sai.
>
>
>
>
>
>
>
>