You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Frank Maritato <fm...@opentable.com.INVALID> on 2018/09/14 16:23:33 UTC

Re: [External] Re: Dynamic tasks in a dag?

Do you mean give the full path to the files? The relative path I'm using definitely works. When I type airflow list_dags, I can see the output from the print statements that the glob is finding my sql files and creating the snowflake operators.

airflow list_tasks workflow also lists all the operators I'm creating. I'm just not seeing them in the ui.

On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:

    Hi frank,
    Can you try giving global paths?
    
    On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fm...@opentable.com.invalid>
    wrote:
    
    > Hi,
    >
    > I'm using apache airflow 1.10.0 and I'm trying to dynamically generate
    > some tasks in my dag based on files that are in the dags directory. The
    > problem is, I don't see these tasks in the ui, I just see the 'start' dummy
    > operator. If I type 'airflow list_tasks workflow', they are listed.
    > Thoughts?
    >
    > Here is how I'm generating the tasks:
    >
    >
    > def create_snowflake_operator(file, dag, snowflake_connection):
    >     file_repl = file.replace('/', '_')
    >     file_repl = file_repl.replace('.sql', '')
    >     print("TASK_ID {}".format(file_repl))
    >     return SnowflakeOperator(
    >         dag=dag,
    >         task_id='create_{}'.format(file_repl),
    >         snowflake_conn_id=snowflake_connection,
    >         sql=file
    >     )
    >
    > DAG_NAME = 'create_objects'
    > dag = DAG(
    >     DAG_NAME,
    >     default_args=args,
    >     dagrun_timeout=timedelta(hours=2),
    >     schedule_interval=None,
    > )
    >
    > start = DummyOperator(
    >     dag=dag,
    >     task_id="start",
    > )
    >
    > print("creating snowflake operators")
    >
    > for file in glob('dags/snowsql/create/udf/*.sql'):
    >     print("FILE {}".format(file))
    >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     task.set_upstream(start)
    >
    > for file in glob('dags/snowsql/create/table/*.sql'):
    >     print("FILE {}".format(file))
    >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     task.set_upstream(start)
    >
    > for file in glob('dags/snowsql/create/view/*.sql'):
    >     print("FILE {}".format(file))
    >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     task.set_upstream(start)
    >
    > print("done {}".format(start.downstream_task_ids))
    >
    > Thanks in advance
    > --
    > Frank Maritato
    >
    


Re: [External] Dynamic tasks in a dag?

Posted by Ash Berlin-Taylor <as...@apache.org>.
This isn't needed as the tasks are added to the dag when specified, so they DAG object keeps track of the tasks.

-ash 

> On 14 Sep 2018, at 18:30, Alex Tronchin-James 949-412-7220 <al...@gmail.com> wrote:
> 
> Don't you need to preserve the task objects? Your implementation overwrites
> each by the successor, so only the last task would be kept, despite your
> print statements. Try building a list or dict of tasks like:
> 
> tasks =[] #only at the top
> for file in glob('dags/snowsql/create/udf/*.sql'):
> print("FILE {}".format(file))
> tasks.append(
> create_snowflake_operator(file, dag, 'snowflake_default')
> )
> tasks[-1].set_upstream(start)
> 
> On Fri, Sep 14, 2018 at 17:20 Frank Maritato
> <fm...@opentable.com.invalid> wrote:
> 
>> Ok, my mistake. I thought that command was querying the server for its
>> information and not just looking in a directory relative to where it is
>> being run. I have it working now. Thanks Chris and Sai!
>> 
>> 
>> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>> 
>>   The relative paths might work from where ever you are evoking 'airflow
>>   list_tasks', but that doesn't mean they work from wherever the
>> webserver is
>>   parsing the dags from.
>> 
>>   Does running 'airflow list_tasks' from some other running directory
>> work?
>> 
>>   On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
>>   <fm...@opentable.com.invalid> wrote:
>> 
>>> Do you mean give the full path to the files? The relative path I'm
>> using
>>> definitely works. When I type airflow list_dags, I can see the
>> output from
>>> the print statements that the glob is finding my sql files and
>> creating the
>>> snowflake operators.
>>> 
>>> airflow list_tasks workflow also lists all the operators I'm
>> creating. I'm
>>> just not seeing them in the ui.
>>> 
>>> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>>> 
>>>   Hi frank,
>>>   Can you try giving global paths?
>>> 
>>>   On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
>> fmaritato@opentable.com
>>> .invalid>
>>>   wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I'm using apache airflow 1.10.0 and I'm trying to dynamically
>>> generate
>>>> some tasks in my dag based on files that are in the dags
>> directory.
>>> The
>>>> problem is, I don't see these tasks in the ui, I just see the
>>> 'start' dummy
>>>> operator. If I type 'airflow list_tasks workflow', they are
>> listed.
>>>> Thoughts?
>>>> 
>>>> Here is how I'm generating the tasks:
>>>> 
>>>> 
>>>> def create_snowflake_operator(file, dag, snowflake_connection):
>>>>   file_repl = file.replace('/', '_')
>>>>   file_repl = file_repl.replace('.sql', '')
>>>>   print("TASK_ID {}".format(file_repl))
>>>>   return SnowflakeOperator(
>>>>       dag=dag,
>>>>       task_id='create_{}'.format(file_repl),
>>>>       snowflake_conn_id=snowflake_connection,
>>>>       sql=file
>>>>   )
>>>> 
>>>> DAG_NAME = 'create_objects'
>>>> dag = DAG(
>>>>   DAG_NAME,
>>>>   default_args=args,
>>>>   dagrun_timeout=timedelta(hours=2),
>>>>   schedule_interval=None,
>>>> )
>>>> 
>>>> start = DummyOperator(
>>>>   dag=dag,
>>>>   task_id="start",
>>>> )
>>>> 
>>>> print("creating snowflake operators")
>>>> 
>>>> for file in glob('dags/snowsql/create/udf/*.sql'):
>>>>   print("FILE {}".format(file))
>>>>   task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>   task.set_upstream(start)
>>>> 
>>>> for file in glob('dags/snowsql/create/table/*.sql'):
>>>>   print("FILE {}".format(file))
>>>>   task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>   task.set_upstream(start)
>>>> 
>>>> for file in glob('dags/snowsql/create/view/*.sql'):
>>>>   print("FILE {}".format(file))
>>>>   task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>   task.set_upstream(start)
>>>> 
>>>> print("done {}".format(start.downstream_task_ids))
>>>> 
>>>> Thanks in advance
>>>> --
>>>> Frank Maritato
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 


Re: [External] Dynamic tasks in a dag?

Posted by Ash Berlin-Taylor <as...@apache.org>.
This isn't needed as the tasks are added to the dag when specified, so they DAG object keeps track of the tasks.

-ash 

> On 14 Sep 2018, at 18:30, Alex Tronchin-James 949-412-7220 <al...@gmail.com> wrote:
> 
> Don't you need to preserve the task objects? Your implementation overwrites
> each by the successor, so only the last task would be kept, despite your
> print statements. Try building a list or dict of tasks like:
> 
> tasks =[] #only at the top
> for file in glob('dags/snowsql/create/udf/*.sql'):
> print("FILE {}".format(file))
> tasks.append(
> create_snowflake_operator(file, dag, 'snowflake_default')
> )
> tasks[-1].set_upstream(start)
> 
> On Fri, Sep 14, 2018 at 17:20 Frank Maritato
> <fm...@opentable.com.invalid> wrote:
> 
>> Ok, my mistake. I thought that command was querying the server for its
>> information and not just looking in a directory relative to where it is
>> being run. I have it working now. Thanks Chris and Sai!
>> 
>> 
>> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>> 
>>    The relative paths might work from where ever you are evoking 'airflow
>>    list_tasks', but that doesn't mean they work from wherever the
>> webserver is
>>    parsing the dags from.
>> 
>>    Does running 'airflow list_tasks' from some other running directory
>> work?
>> 
>>    On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
>>    <fm...@opentable.com.invalid> wrote:
>> 
>>> Do you mean give the full path to the files? The relative path I'm
>> using
>>> definitely works. When I type airflow list_dags, I can see the
>> output from
>>> the print statements that the glob is finding my sql files and
>> creating the
>>> snowflake operators.
>>> 
>>> airflow list_tasks workflow also lists all the operators I'm
>> creating. I'm
>>> just not seeing them in the ui.
>>> 
>>> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>>> 
>>>    Hi frank,
>>>    Can you try giving global paths?
>>> 
>>>    On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
>> fmaritato@opentable.com
>>> .invalid>
>>>    wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I'm using apache airflow 1.10.0 and I'm trying to dynamically
>>> generate
>>>> some tasks in my dag based on files that are in the dags
>> directory.
>>> The
>>>> problem is, I don't see these tasks in the ui, I just see the
>>> 'start' dummy
>>>> operator. If I type 'airflow list_tasks workflow', they are
>> listed.
>>>> Thoughts?
>>>> 
>>>> Here is how I'm generating the tasks:
>>>> 
>>>> 
>>>> def create_snowflake_operator(file, dag, snowflake_connection):
>>>>    file_repl = file.replace('/', '_')
>>>>    file_repl = file_repl.replace('.sql', '')
>>>>    print("TASK_ID {}".format(file_repl))
>>>>    return SnowflakeOperator(
>>>>        dag=dag,
>>>>        task_id='create_{}'.format(file_repl),
>>>>        snowflake_conn_id=snowflake_connection,
>>>>        sql=file
>>>>    )
>>>> 
>>>> DAG_NAME = 'create_objects'
>>>> dag = DAG(
>>>>    DAG_NAME,
>>>>    default_args=args,
>>>>    dagrun_timeout=timedelta(hours=2),
>>>>    schedule_interval=None,
>>>> )
>>>> 
>>>> start = DummyOperator(
>>>>    dag=dag,
>>>>    task_id="start",
>>>> )
>>>> 
>>>> print("creating snowflake operators")
>>>> 
>>>> for file in glob('dags/snowsql/create/udf/*.sql'):
>>>>    print("FILE {}".format(file))
>>>>    task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>    task.set_upstream(start)
>>>> 
>>>> for file in glob('dags/snowsql/create/table/*.sql'):
>>>>    print("FILE {}".format(file))
>>>>    task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>    task.set_upstream(start)
>>>> 
>>>> for file in glob('dags/snowsql/create/view/*.sql'):
>>>>    print("FILE {}".format(file))
>>>>    task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>>    task.set_upstream(start)
>>>> 
>>>> print("done {}".format(start.downstream_task_ids))
>>>> 
>>>> Thanks in advance
>>>> --
>>>> Frank Maritato
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 


Re: [External] Re: Dynamic tasks in a dag?

Posted by Alex Tronchin-James 949-412-7220 <al...@gmail.com>.
Don't you need to preserve the task objects? Your implementation overwrites
each by the successor, so only the last task would be kept, despite your
print statements. Try building a list or dict of tasks like:

tasks =[] #only at the top
for file in glob('dags/snowsql/create/udf/*.sql'):
print("FILE {}".format(file))
tasks.append(
create_snowflake_operator(file, dag, 'snowflake_default')
)
tasks[-1].set_upstream(start)

On Fri, Sep 14, 2018 at 17:20 Frank Maritato
<fm...@opentable.com.invalid> wrote:

> Ok, my mistake. I thought that command was querying the server for its
> information and not just looking in a directory relative to where it is
> being run. I have it working now. Thanks Chris and Sai!
>
>
> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>
>     The relative paths might work from where ever you are evoking 'airflow
>     list_tasks', but that doesn't mean they work from wherever the
> webserver is
>     parsing the dags from.
>
>     Does running 'airflow list_tasks' from some other running directory
> work?
>
>     On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
>     <fm...@opentable.com.invalid> wrote:
>
>     > Do you mean give the full path to the files? The relative path I'm
> using
>     > definitely works. When I type airflow list_dags, I can see the
> output from
>     > the print statements that the glob is finding my sql files and
> creating the
>     > snowflake operators.
>     >
>     > airflow list_tasks workflow also lists all the operators I'm
> creating. I'm
>     > just not seeing them in the ui.
>     >
>     > On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>     >
>     >     Hi frank,
>     >     Can you try giving global paths?
>     >
>     >     On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
> fmaritato@opentable.com
>     > .invalid>
>     >     wrote:
>     >
>     >     > Hi,
>     >     >
>     >     > I'm using apache airflow 1.10.0 and I'm trying to dynamically
>     > generate
>     >     > some tasks in my dag based on files that are in the dags
> directory.
>     > The
>     >     > problem is, I don't see these tasks in the ui, I just see the
>     > 'start' dummy
>     >     > operator. If I type 'airflow list_tasks workflow', they are
> listed.
>     >     > Thoughts?
>     >     >
>     >     > Here is how I'm generating the tasks:
>     >     >
>     >     >
>     >     > def create_snowflake_operator(file, dag, snowflake_connection):
>     >     >     file_repl = file.replace('/', '_')
>     >     >     file_repl = file_repl.replace('.sql', '')
>     >     >     print("TASK_ID {}".format(file_repl))
>     >     >     return SnowflakeOperator(
>     >     >         dag=dag,
>     >     >         task_id='create_{}'.format(file_repl),
>     >     >         snowflake_conn_id=snowflake_connection,
>     >     >         sql=file
>     >     >     )
>     >     >
>     >     > DAG_NAME = 'create_objects'
>     >     > dag = DAG(
>     >     >     DAG_NAME,
>     >     >     default_args=args,
>     >     >     dagrun_timeout=timedelta(hours=2),
>     >     >     schedule_interval=None,
>     >     > )
>     >     >
>     >     > start = DummyOperator(
>     >     >     dag=dag,
>     >     >     task_id="start",
>     >     > )
>     >     >
>     >     > print("creating snowflake operators")
>     >     >
>     >     > for file in glob('dags/snowsql/create/udf/*.sql'):
>     >     >     print("FILE {}".format(file))
>     >     >     task = create_snowflake_operator(file, dag,
> 'snowflake_default')
>     >     >     task.set_upstream(start)
>     >     >
>     >     > for file in glob('dags/snowsql/create/table/*.sql'):
>     >     >     print("FILE {}".format(file))
>     >     >     task = create_snowflake_operator(file, dag,
> 'snowflake_default')
>     >     >     task.set_upstream(start)
>     >     >
>     >     > for file in glob('dags/snowsql/create/view/*.sql'):
>     >     >     print("FILE {}".format(file))
>     >     >     task = create_snowflake_operator(file, dag,
> 'snowflake_default')
>     >     >     task.set_upstream(start)
>     >     >
>     >     > print("done {}".format(start.downstream_task_ids))
>     >     >
>     >     > Thanks in advance
>     >     > --
>     >     > Frank Maritato
>     >     >
>     >
>     >
>     >
>
>
>

Re: [External] Re: Dynamic tasks in a dag?

Posted by Frank Maritato <fm...@opentable.com.INVALID>.
Ok, my mistake. I thought that command was querying the server for its information and not just looking in a directory relative to where it is being run. I have it working now. Thanks Chris and Sai!


On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:

    The relative paths might work from where ever you are evoking 'airflow
    list_tasks', but that doesn't mean they work from wherever the webserver is
    parsing the dags from.
    
    Does running 'airflow list_tasks' from some other running directory work?
    
    On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
    <fm...@opentable.com.invalid> wrote:
    
    > Do you mean give the full path to the files? The relative path I'm using
    > definitely works. When I type airflow list_dags, I can see the output from
    > the print statements that the glob is finding my sql files and creating the
    > snowflake operators.
    >
    > airflow list_tasks workflow also lists all the operators I'm creating. I'm
    > just not seeing them in the ui.
    >
    > On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
    >
    >     Hi frank,
    >     Can you try giving global paths?
    >
    >     On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fmaritato@opentable.com
    > .invalid>
    >     wrote:
    >
    >     > Hi,
    >     >
    >     > I'm using apache airflow 1.10.0 and I'm trying to dynamically
    > generate
    >     > some tasks in my dag based on files that are in the dags directory.
    > The
    >     > problem is, I don't see these tasks in the ui, I just see the
    > 'start' dummy
    >     > operator. If I type 'airflow list_tasks workflow', they are listed.
    >     > Thoughts?
    >     >
    >     > Here is how I'm generating the tasks:
    >     >
    >     >
    >     > def create_snowflake_operator(file, dag, snowflake_connection):
    >     >     file_repl = file.replace('/', '_')
    >     >     file_repl = file_repl.replace('.sql', '')
    >     >     print("TASK_ID {}".format(file_repl))
    >     >     return SnowflakeOperator(
    >     >         dag=dag,
    >     >         task_id='create_{}'.format(file_repl),
    >     >         snowflake_conn_id=snowflake_connection,
    >     >         sql=file
    >     >     )
    >     >
    >     > DAG_NAME = 'create_objects'
    >     > dag = DAG(
    >     >     DAG_NAME,
    >     >     default_args=args,
    >     >     dagrun_timeout=timedelta(hours=2),
    >     >     schedule_interval=None,
    >     > )
    >     >
    >     > start = DummyOperator(
    >     >     dag=dag,
    >     >     task_id="start",
    >     > )
    >     >
    >     > print("creating snowflake operators")
    >     >
    >     > for file in glob('dags/snowsql/create/udf/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > for file in glob('dags/snowsql/create/table/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > for file in glob('dags/snowsql/create/view/*.sql'):
    >     >     print("FILE {}".format(file))
    >     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
    >     >     task.set_upstream(start)
    >     >
    >     > print("done {}".format(start.downstream_task_ids))
    >     >
    >     > Thanks in advance
    >     > --
    >     > Frank Maritato
    >     >
    >
    >
    >
    


Re: [External] Re: Dynamic tasks in a dag?

Posted by Chris Palmer <ch...@crpalmer.com>.
The relative paths might work from where ever you are evoking 'airflow
list_tasks', but that doesn't mean they work from wherever the webserver is
parsing the dags from.

Does running 'airflow list_tasks' from some other running directory work?

On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
<fm...@opentable.com.invalid> wrote:

> Do you mean give the full path to the files? The relative path I'm using
> definitely works. When I type airflow list_dags, I can see the output from
> the print statements that the glob is finding my sql files and creating the
> snowflake operators.
>
> airflow list_tasks workflow also lists all the operators I'm creating. I'm
> just not seeing them in the ui.
>
> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>
>     Hi frank,
>     Can you try giving global paths?
>
>     On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fmaritato@opentable.com
> .invalid>
>     wrote:
>
>     > Hi,
>     >
>     > I'm using apache airflow 1.10.0 and I'm trying to dynamically
> generate
>     > some tasks in my dag based on files that are in the dags directory.
> The
>     > problem is, I don't see these tasks in the ui, I just see the
> 'start' dummy
>     > operator. If I type 'airflow list_tasks workflow', they are listed.
>     > Thoughts?
>     >
>     > Here is how I'm generating the tasks:
>     >
>     >
>     > def create_snowflake_operator(file, dag, snowflake_connection):
>     >     file_repl = file.replace('/', '_')
>     >     file_repl = file_repl.replace('.sql', '')
>     >     print("TASK_ID {}".format(file_repl))
>     >     return SnowflakeOperator(
>     >         dag=dag,
>     >         task_id='create_{}'.format(file_repl),
>     >         snowflake_conn_id=snowflake_connection,
>     >         sql=file
>     >     )
>     >
>     > DAG_NAME = 'create_objects'
>     > dag = DAG(
>     >     DAG_NAME,
>     >     default_args=args,
>     >     dagrun_timeout=timedelta(hours=2),
>     >     schedule_interval=None,
>     > )
>     >
>     > start = DummyOperator(
>     >     dag=dag,
>     >     task_id="start",
>     > )
>     >
>     > print("creating snowflake operators")
>     >
>     > for file in glob('dags/snowsql/create/udf/*.sql'):
>     >     print("FILE {}".format(file))
>     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
>     >     task.set_upstream(start)
>     >
>     > for file in glob('dags/snowsql/create/table/*.sql'):
>     >     print("FILE {}".format(file))
>     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
>     >     task.set_upstream(start)
>     >
>     > for file in glob('dags/snowsql/create/view/*.sql'):
>     >     print("FILE {}".format(file))
>     >     task = create_snowflake_operator(file, dag, 'snowflake_default')
>     >     task.set_upstream(start)
>     >
>     > print("done {}".format(start.downstream_task_ids))
>     >
>     > Thanks in advance
>     > --
>     > Frank Maritato
>     >
>
>
>