You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Frank Maritato <fm...@opentable.com.INVALID> on 2018/09/14 16:23:33 UTC
Re: [External] Re: Dynamic tasks in a dag?
Do you mean give the full path to the files? The relative path I'm using definitely works. When I type airflow list_dags, I can see the output from the print statements that the glob is finding my sql files and creating the snowflake operators.
airflow list_tasks workflow also lists all the operators I'm creating. I'm just not seeing them in the ui.
On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
Hi frank,
Can you try giving global paths?
On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fm...@opentable.com.invalid>
wrote:
> Hi,
>
> I'm using apache airflow 1.10.0 and I'm trying to dynamically generate
> some tasks in my dag based on files that are in the dags directory. The
> problem is, I don't see these tasks in the ui, I just see the 'start' dummy
> operator. If I type 'airflow list_tasks workflow', they are listed.
> Thoughts?
>
> Here is how I'm generating the tasks:
>
>
> def create_snowflake_operator(file, dag, snowflake_connection):
> file_repl = file.replace('/', '_')
> file_repl = file_repl.replace('.sql', '')
> print("TASK_ID {}".format(file_repl))
> return SnowflakeOperator(
> dag=dag,
> task_id='create_{}'.format(file_repl),
> snowflake_conn_id=snowflake_connection,
> sql=file
> )
>
> DAG_NAME = 'create_objects'
> dag = DAG(
> DAG_NAME,
> default_args=args,
> dagrun_timeout=timedelta(hours=2),
> schedule_interval=None,
> )
>
> start = DummyOperator(
> dag=dag,
> task_id="start",
> )
>
> print("creating snowflake operators")
>
> for file in glob('dags/snowsql/create/udf/*.sql'):
> print("FILE {}".format(file))
> task = create_snowflake_operator(file, dag, 'snowflake_default')
> task.set_upstream(start)
>
> for file in glob('dags/snowsql/create/table/*.sql'):
> print("FILE {}".format(file))
> task = create_snowflake_operator(file, dag, 'snowflake_default')
> task.set_upstream(start)
>
> for file in glob('dags/snowsql/create/view/*.sql'):
> print("FILE {}".format(file))
> task = create_snowflake_operator(file, dag, 'snowflake_default')
> task.set_upstream(start)
>
> print("done {}".format(start.downstream_task_ids))
>
> Thanks in advance
> --
> Frank Maritato
>
Re: [External] Dynamic tasks in a dag?
Posted by Ash Berlin-Taylor <as...@apache.org>.
This isn't needed as the tasks are added to the dag when specified, so they DAG object keeps track of the tasks.
-ash
> On 14 Sep 2018, at 18:30, Alex Tronchin-James 949-412-7220 <al...@gmail.com> wrote:
>
> Don't you need to preserve the task objects? Your implementation overwrites
> each by the successor, so only the last task would be kept, despite your
> print statements. Try building a list or dict of tasks like:
>
> tasks =[] #only at the top
> for file in glob('dags/snowsql/create/udf/*.sql'):
> print("FILE {}".format(file))
> tasks.append(
> create_snowflake_operator(file, dag, 'snowflake_default')
> )
> tasks[-1].set_upstream(start)
>
> On Fri, Sep 14, 2018 at 17:20 Frank Maritato
> <fm...@opentable.com.invalid> wrote:
>
>> Ok, my mistake. I thought that command was querying the server for its
>> information and not just looking in a directory relative to where it is
>> being run. I have it working now. Thanks Chris and Sai!
>>
>>
>> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>>
>> The relative paths might work from where ever you are evoking 'airflow
>> list_tasks', but that doesn't mean they work from wherever the
>> webserver is
>> parsing the dags from.
>>
>> Does running 'airflow list_tasks' from some other running directory
>> work?
>>
>> On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
>> <fm...@opentable.com.invalid> wrote:
>>
>>> Do you mean give the full path to the files? The relative path I'm
>> using
>>> definitely works. When I type airflow list_dags, I can see the
>> output from
>>> the print statements that the glob is finding my sql files and
>> creating the
>>> snowflake operators.
>>>
>>> airflow list_tasks workflow also lists all the operators I'm
>> creating. I'm
>>> just not seeing them in the ui.
>>>
>>> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>>>
>>> Hi frank,
>>> Can you try giving global paths?
>>>
>>> On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
>> fmaritato@opentable.com
>>> .invalid>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm using apache airflow 1.10.0 and I'm trying to dynamically
>>> generate
>>>> some tasks in my dag based on files that are in the dags
>> directory.
>>> The
>>>> problem is, I don't see these tasks in the ui, I just see the
>>> 'start' dummy
>>>> operator. If I type 'airflow list_tasks workflow', they are
>> listed.
>>>> Thoughts?
>>>>
>>>> Here is how I'm generating the tasks:
>>>>
>>>>
>>>> def create_snowflake_operator(file, dag, snowflake_connection):
>>>> file_repl = file.replace('/', '_')
>>>> file_repl = file_repl.replace('.sql', '')
>>>> print("TASK_ID {}".format(file_repl))
>>>> return SnowflakeOperator(
>>>> dag=dag,
>>>> task_id='create_{}'.format(file_repl),
>>>> snowflake_conn_id=snowflake_connection,
>>>> sql=file
>>>> )
>>>>
>>>> DAG_NAME = 'create_objects'
>>>> dag = DAG(
>>>> DAG_NAME,
>>>> default_args=args,
>>>> dagrun_timeout=timedelta(hours=2),
>>>> schedule_interval=None,
>>>> )
>>>>
>>>> start = DummyOperator(
>>>> dag=dag,
>>>> task_id="start",
>>>> )
>>>>
>>>> print("creating snowflake operators")
>>>>
>>>> for file in glob('dags/snowsql/create/udf/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> for file in glob('dags/snowsql/create/table/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> for file in glob('dags/snowsql/create/view/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> print("done {}".format(start.downstream_task_ids))
>>>>
>>>> Thanks in advance
>>>> --
>>>> Frank Maritato
>>>>
>>>
>>>
>>>
>>
>>
>>
Re: [External] Dynamic tasks in a dag?
Posted by Ash Berlin-Taylor <as...@apache.org>.
This isn't needed as the tasks are added to the dag when specified, so they DAG object keeps track of the tasks.
-ash
> On 14 Sep 2018, at 18:30, Alex Tronchin-James 949-412-7220 <al...@gmail.com> wrote:
>
> Don't you need to preserve the task objects? Your implementation overwrites
> each by the successor, so only the last task would be kept, despite your
> print statements. Try building a list or dict of tasks like:
>
> tasks =[] #only at the top
> for file in glob('dags/snowsql/create/udf/*.sql'):
> print("FILE {}".format(file))
> tasks.append(
> create_snowflake_operator(file, dag, 'snowflake_default')
> )
> tasks[-1].set_upstream(start)
>
> On Fri, Sep 14, 2018 at 17:20 Frank Maritato
> <fm...@opentable.com.invalid> wrote:
>
>> Ok, my mistake. I thought that command was querying the server for its
>> information and not just looking in a directory relative to where it is
>> being run. I have it working now. Thanks Chris and Sai!
>>
>>
>> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>>
>> The relative paths might work from where ever you are evoking 'airflow
>> list_tasks', but that doesn't mean they work from wherever the
>> webserver is
>> parsing the dags from.
>>
>> Does running 'airflow list_tasks' from some other running directory
>> work?
>>
>> On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
>> <fm...@opentable.com.invalid> wrote:
>>
>>> Do you mean give the full path to the files? The relative path I'm
>> using
>>> definitely works. When I type airflow list_dags, I can see the
>> output from
>>> the print statements that the glob is finding my sql files and
>> creating the
>>> snowflake operators.
>>>
>>> airflow list_tasks workflow also lists all the operators I'm
>> creating. I'm
>>> just not seeing them in the ui.
>>>
>>> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>>>
>>> Hi frank,
>>> Can you try giving global paths?
>>>
>>> On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
>> fmaritato@opentable.com
>>> .invalid>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm using apache airflow 1.10.0 and I'm trying to dynamically
>>> generate
>>>> some tasks in my dag based on files that are in the dags
>> directory.
>>> The
>>>> problem is, I don't see these tasks in the ui, I just see the
>>> 'start' dummy
>>>> operator. If I type 'airflow list_tasks workflow', they are
>> listed.
>>>> Thoughts?
>>>>
>>>> Here is how I'm generating the tasks:
>>>>
>>>>
>>>> def create_snowflake_operator(file, dag, snowflake_connection):
>>>> file_repl = file.replace('/', '_')
>>>> file_repl = file_repl.replace('.sql', '')
>>>> print("TASK_ID {}".format(file_repl))
>>>> return SnowflakeOperator(
>>>> dag=dag,
>>>> task_id='create_{}'.format(file_repl),
>>>> snowflake_conn_id=snowflake_connection,
>>>> sql=file
>>>> )
>>>>
>>>> DAG_NAME = 'create_objects'
>>>> dag = DAG(
>>>> DAG_NAME,
>>>> default_args=args,
>>>> dagrun_timeout=timedelta(hours=2),
>>>> schedule_interval=None,
>>>> )
>>>>
>>>> start = DummyOperator(
>>>> dag=dag,
>>>> task_id="start",
>>>> )
>>>>
>>>> print("creating snowflake operators")
>>>>
>>>> for file in glob('dags/snowsql/create/udf/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> for file in glob('dags/snowsql/create/table/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> for file in glob('dags/snowsql/create/view/*.sql'):
>>>> print("FILE {}".format(file))
>>>> task = create_snowflake_operator(file, dag,
>> 'snowflake_default')
>>>> task.set_upstream(start)
>>>>
>>>> print("done {}".format(start.downstream_task_ids))
>>>>
>>>> Thanks in advance
>>>> --
>>>> Frank Maritato
>>>>
>>>
>>>
>>>
>>
>>
>>
Re: [External] Re: Dynamic tasks in a dag?
Posted by Alex Tronchin-James 949-412-7220 <al...@gmail.com>.
Don't you need to preserve the task objects? Your implementation overwrites
each by the successor, so only the last task would be kept, despite your
print statements. Try building a list or dict of tasks like:
tasks =[] #only at the top
for file in glob('dags/snowsql/create/udf/*.sql'):
print("FILE {}".format(file))
tasks.append(
create_snowflake_operator(file, dag, 'snowflake_default')
)
tasks[-1].set_upstream(start)
On Fri, Sep 14, 2018 at 17:20 Frank Maritato
<fm...@opentable.com.invalid> wrote:
> Ok, my mistake. I thought that command was querying the server for its
> information and not just looking in a directory relative to where it is
> being run. I have it working now. Thanks Chris and Sai!
>
>
> On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
>
> The relative paths might work from where ever you are evoking 'airflow
> list_tasks', but that doesn't mean they work from wherever the
> webserver is
> parsing the dags from.
>
> Does running 'airflow list_tasks' from some other running directory
> work?
>
> On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
> <fm...@opentable.com.invalid> wrote:
>
> > Do you mean give the full path to the files? The relative path I'm
> using
> > definitely works. When I type airflow list_dags, I can see the
> output from
> > the print statements that the glob is finding my sql files and
> creating the
> > snowflake operators.
> >
> > airflow list_tasks workflow also lists all the operators I'm
> creating. I'm
> > just not seeing them in the ui.
> >
> > On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
> >
> > Hi frank,
> > Can you try giving global paths?
> >
> > On Fri 14 Sep, 2018, 21:35 Frank Maritato, <
> fmaritato@opentable.com
> > .invalid>
> > wrote:
> >
> > > Hi,
> > >
> > > I'm using apache airflow 1.10.0 and I'm trying to dynamically
> > generate
> > > some tasks in my dag based on files that are in the dags
> directory.
> > The
> > > problem is, I don't see these tasks in the ui, I just see the
> > 'start' dummy
> > > operator. If I type 'airflow list_tasks workflow', they are
> listed.
> > > Thoughts?
> > >
> > > Here is how I'm generating the tasks:
> > >
> > >
> > > def create_snowflake_operator(file, dag, snowflake_connection):
> > > file_repl = file.replace('/', '_')
> > > file_repl = file_repl.replace('.sql', '')
> > > print("TASK_ID {}".format(file_repl))
> > > return SnowflakeOperator(
> > > dag=dag,
> > > task_id='create_{}'.format(file_repl),
> > > snowflake_conn_id=snowflake_connection,
> > > sql=file
> > > )
> > >
> > > DAG_NAME = 'create_objects'
> > > dag = DAG(
> > > DAG_NAME,
> > > default_args=args,
> > > dagrun_timeout=timedelta(hours=2),
> > > schedule_interval=None,
> > > )
> > >
> > > start = DummyOperator(
> > > dag=dag,
> > > task_id="start",
> > > )
> > >
> > > print("creating snowflake operators")
> > >
> > > for file in glob('dags/snowsql/create/udf/*.sql'):
> > > print("FILE {}".format(file))
> > > task = create_snowflake_operator(file, dag,
> 'snowflake_default')
> > > task.set_upstream(start)
> > >
> > > for file in glob('dags/snowsql/create/table/*.sql'):
> > > print("FILE {}".format(file))
> > > task = create_snowflake_operator(file, dag,
> 'snowflake_default')
> > > task.set_upstream(start)
> > >
> > > for file in glob('dags/snowsql/create/view/*.sql'):
> > > print("FILE {}".format(file))
> > > task = create_snowflake_operator(file, dag,
> 'snowflake_default')
> > > task.set_upstream(start)
> > >
> > > print("done {}".format(start.downstream_task_ids))
> > >
> > > Thanks in advance
> > > --
> > > Frank Maritato
> > >
> >
> >
> >
>
>
>
Re: [External] Re: Dynamic tasks in a dag?
Posted by Frank Maritato <fm...@opentable.com.INVALID>.
Ok, my mistake. I thought that command was querying the server for its information and not just looking in a directory relative to where it is being run. I have it working now. Thanks Chris and Sai!
On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote:
The relative paths might work from where ever you are evoking 'airflow
list_tasks', but that doesn't mean they work from wherever the webserver is
parsing the dags from.
Does running 'airflow list_tasks' from some other running directory work?
On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
<fm...@opentable.com.invalid> wrote:
> Do you mean give the full path to the files? The relative path I'm using
> definitely works. When I type airflow list_dags, I can see the output from
> the print statements that the glob is finding my sql files and creating the
> snowflake operators.
>
> airflow list_tasks workflow also lists all the operators I'm creating. I'm
> just not seeing them in the ui.
>
> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>
> Hi frank,
> Can you try giving global paths?
>
> On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fmaritato@opentable.com
> .invalid>
> wrote:
>
> > Hi,
> >
> > I'm using apache airflow 1.10.0 and I'm trying to dynamically
> generate
> > some tasks in my dag based on files that are in the dags directory.
> The
> > problem is, I don't see these tasks in the ui, I just see the
> 'start' dummy
> > operator. If I type 'airflow list_tasks workflow', they are listed.
> > Thoughts?
> >
> > Here is how I'm generating the tasks:
> >
> >
> > def create_snowflake_operator(file, dag, snowflake_connection):
> > file_repl = file.replace('/', '_')
> > file_repl = file_repl.replace('.sql', '')
> > print("TASK_ID {}".format(file_repl))
> > return SnowflakeOperator(
> > dag=dag,
> > task_id='create_{}'.format(file_repl),
> > snowflake_conn_id=snowflake_connection,
> > sql=file
> > )
> >
> > DAG_NAME = 'create_objects'
> > dag = DAG(
> > DAG_NAME,
> > default_args=args,
> > dagrun_timeout=timedelta(hours=2),
> > schedule_interval=None,
> > )
> >
> > start = DummyOperator(
> > dag=dag,
> > task_id="start",
> > )
> >
> > print("creating snowflake operators")
> >
> > for file in glob('dags/snowsql/create/udf/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/table/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/view/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > print("done {}".format(start.downstream_task_ids))
> >
> > Thanks in advance
> > --
> > Frank Maritato
> >
>
>
>
Re: [External] Re: Dynamic tasks in a dag?
Posted by Chris Palmer <ch...@crpalmer.com>.
The relative paths might work from where ever you are evoking 'airflow
list_tasks', but that doesn't mean they work from wherever the webserver is
parsing the dags from.
Does running 'airflow list_tasks' from some other running directory work?
On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato
<fm...@opentable.com.invalid> wrote:
> Do you mean give the full path to the files? The relative path I'm using
> definitely works. When I type airflow list_dags, I can see the output from
> the print statements that the glob is finding my sql files and creating the
> snowflake operators.
>
> airflow list_tasks workflow also lists all the operators I'm creating. I'm
> just not seeing them in the ui.
>
> On 9/14/18, 9:10 AM, "Sai Phanindhra" <ph...@gmail.com> wrote:
>
> Hi frank,
> Can you try giving global paths?
>
> On Fri 14 Sep, 2018, 21:35 Frank Maritato, <fmaritato@opentable.com
> .invalid>
> wrote:
>
> > Hi,
> >
> > I'm using apache airflow 1.10.0 and I'm trying to dynamically
> generate
> > some tasks in my dag based on files that are in the dags directory.
> The
> > problem is, I don't see these tasks in the ui, I just see the
> 'start' dummy
> > operator. If I type 'airflow list_tasks workflow', they are listed.
> > Thoughts?
> >
> > Here is how I'm generating the tasks:
> >
> >
> > def create_snowflake_operator(file, dag, snowflake_connection):
> > file_repl = file.replace('/', '_')
> > file_repl = file_repl.replace('.sql', '')
> > print("TASK_ID {}".format(file_repl))
> > return SnowflakeOperator(
> > dag=dag,
> > task_id='create_{}'.format(file_repl),
> > snowflake_conn_id=snowflake_connection,
> > sql=file
> > )
> >
> > DAG_NAME = 'create_objects'
> > dag = DAG(
> > DAG_NAME,
> > default_args=args,
> > dagrun_timeout=timedelta(hours=2),
> > schedule_interval=None,
> > )
> >
> > start = DummyOperator(
> > dag=dag,
> > task_id="start",
> > )
> >
> > print("creating snowflake operators")
> >
> > for file in glob('dags/snowsql/create/udf/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/table/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > for file in glob('dags/snowsql/create/view/*.sql'):
> > print("FILE {}".format(file))
> > task = create_snowflake_operator(file, dag, 'snowflake_default')
> > task.set_upstream(start)
> >
> > print("done {}".format(start.downstream_task_ids))
> >
> > Thanks in advance
> > --
> > Frank Maritato
> >
>
>
>