You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Vincent Poulain <vi...@tinyclues.com> on 2017/06/20 08:58:24 UTC

EMROperator templating

Hello,

I would like to use EmrCreateJobFlowOperator, using job_flow_overrides
attribute with dynamic attribute.

Example

EmrCreateJobFlowOperator(job_flow_overrides ={'foo' : '{{ xcom.pull }}'})

job_flow_overrides seems very nested to use jinja templating and
template_fields
is set to [].

 Any idea to fetch data from xcom and using it in job_flow_overrides params
?

Thank


-- 

*Vincent Poulain*

Senior Software Engineer



Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21 82
87 62 | vincent@tinyclues.com <su...@tinyclues.com>

Tinyclues | 51 rue Étienne Marcel, 75001 Paris

www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
<https://twitter.com/Tinyclues>

Re: EMROperator templating

Posted by Vincent Poulain <vi...@tinyclues.com>.
Thanks Maxime. I am going to try it in production then see for a
contribution if everything work as expected.

On Wed, Jun 21, 2017 at 8:44 PM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> Actually it looks like while we allow for dicts and lists in
> template_fields, we won't recurse through them:
> https://github.com/apache/incubator-airflow/blob/master/airf
> low/models.py#L2318
>
> Since it's in BaseOperator you could override to
> render_template_from_field recurse
> through dicts and lists in your derived operator.
>
> And of course a PR taking handling that would also be welcomed.
>
> Max
>
> On Wed, Jun 21, 2017 at 12:41 AM, Vincent Poulain <
> vincent.poulain@tinyclues.com> wrote:
>
> > I already tried that, does not work. Seems that attribute
> > job_flow_overrides
> > http://boto3.readthedocs.io/en/latest/reference/services/
> > emr.html#EMR.Client.run_job_flow
> > I am not sure that jinja template can handle something like :
> >
> > EmrCreateJobFlowOperator(job_flow_overrides={
> > ...
> > 'Steps': [
> >                {
> >                     "Name": "Setup Hadoop Debugging",
> >                     "ActionOnFailure": "TERMINATE_CLUSTER",
> >                     "HadoopJarStep": {
> >                         "Jar": "command-runner.jar",
> >                         "Args": [
> >                             "state-pusher-script"
> >                         ]
> >                     }
> >                },
> >                {
> >                     "Name": "tinyclues-foo",
> >                     "ActionOnFailure": "TERMINATE_CLUSTER",
> >                     "HadoopJarStep": {
> >                         "Jar": "command-runner.jar",
> >                         "Args": [
> >                             "spark-submit",
> >                             "--class",
> >                             "com.tinyclues.alwayson.batch.api.Batch",
> >                             '--conf',
> > *                            '-Dconfigs.RUN_ID={{
> > ti.xcom_pull("FIRST_TASK")["RUN_ID"] }}',*
> >
> > "/tmp/tinyclues/{jar_name}".format(jar_name=jar_name)
> >                         ]
> >                     }
> >                }
> >            ]}
> > ...
> >
> > If it should work, I might miss something.
> > I am in to write this easy PR .
> >
> > Thanks
> >
> > On Tue, Jun 20, 2017 at 9:59 PM, Maxime Beauchemin <
> > maximebeauchemin@gmail.com> wrote:
> >
> > > Looks like the author didn't setup any attributes as templated...
> > > https://github.com/apache/incubator-airflow/blob/
> > > ff45d8f2218a8da9328161aa66d004c3db3b367e/airflow/contrib/
> > > operators/emr_create_job_flow_operator.py#L35
> > >
> > > You can do it easily with a simple hack:
> > >
> > > class TemplatedEmrCreateJobFlowOperator(EmrCreateJobFlowOperator):
> > >     template_fields = ['job_flow_overrides']
> > >
> > > Then you just use that operator instead and templating should work for
> > the
> > > values in job_flow_overrides.
> > >
> > > It'd be nice if you also took the time to send a PR adding that line.
> > >
> > > Max
> > >
> > > On Tue, Jun 20, 2017 at 1:58 AM, Vincent Poulain <
> > > vincent.poulain@tinyclues.com> wrote:
> > >
> > > > Hello,
> > > >
> > > > I would like to use EmrCreateJobFlowOperator, using
> job_flow_overrides
> > > > attribute with dynamic attribute.
> > > >
> > > > Example
> > > >
> > > > EmrCreateJobFlowOperator(job_flow_overrides ={'foo' : '{{ xcom.pull
> > > }}'})
> > > >
> > > > job_flow_overrides seems very nested to use jinja templating and
> > > > template_fields
> > > > is set to [].
> > > >
> > > >  Any idea to fetch data from xcom and using it in job_flow_overrides
> > > params
> > > > ?
> > > >
> > > > Thank
> > > >
> > > >
> > > > --
> > > >
> > > > *Vincent Poulain*
> > > >
> > > > Senior Software Engineer
> > > >
> > > >
> > > >
> > > > Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33
> 6
> > 21
> > > > 82
> > > > 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
> > > >
> > > > Tinyclues | 51 rue Étienne Marcel, 75001 Paris
> > > >
> > > > www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> > > > <https://twitter.com/Tinyclues>
> > > >
> > >
> >
> >
> >
> > --
> >
> > *Vincent Poulain*
> >
> > Senior Software Engineer
> >
> >
> >
> > Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21
> > 82
> > 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
> >
> > Tinyclues | 51 rue Étienne Marcel, 75001 Paris
> >
> > www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> > <https://twitter.com/Tinyclues>
> >
>



-- 

*Vincent Poulain*

Senior Software Engineer



Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21 82
87 62 | vincent@tinyclues.com <su...@tinyclues.com>

Tinyclues | 51 rue Étienne Marcel, 75001 Paris

www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
<https://twitter.com/Tinyclues>

Re: EMROperator templating

Posted by Maxime Beauchemin <ma...@gmail.com>.
Actually it looks like while we allow for dicts and lists in
template_fields, we won't recurse through them:
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L2318

Since it's in BaseOperator you could override to
render_template_from_field recurse
through dicts and lists in your derived operator.

And of course a PR taking handling that would also be welcomed.

Max

On Wed, Jun 21, 2017 at 12:41 AM, Vincent Poulain <
vincent.poulain@tinyclues.com> wrote:

> I already tried that, does not work. Seems that attribute
> job_flow_overrides
> http://boto3.readthedocs.io/en/latest/reference/services/
> emr.html#EMR.Client.run_job_flow
> I am not sure that jinja template can handle something like :
>
> EmrCreateJobFlowOperator(job_flow_overrides={
> ...
> 'Steps': [
>                {
>                     "Name": "Setup Hadoop Debugging",
>                     "ActionOnFailure": "TERMINATE_CLUSTER",
>                     "HadoopJarStep": {
>                         "Jar": "command-runner.jar",
>                         "Args": [
>                             "state-pusher-script"
>                         ]
>                     }
>                },
>                {
>                     "Name": "tinyclues-foo",
>                     "ActionOnFailure": "TERMINATE_CLUSTER",
>                     "HadoopJarStep": {
>                         "Jar": "command-runner.jar",
>                         "Args": [
>                             "spark-submit",
>                             "--class",
>                             "com.tinyclues.alwayson.batch.api.Batch",
>                             '--conf',
> *                            '-Dconfigs.RUN_ID={{
> ti.xcom_pull("FIRST_TASK")["RUN_ID"] }}',*
>
> "/tmp/tinyclues/{jar_name}".format(jar_name=jar_name)
>                         ]
>                     }
>                }
>            ]}
> ...
>
> If it should work, I might miss something.
> I am in to write this easy PR .
>
> Thanks
>
> On Tue, Jun 20, 2017 at 9:59 PM, Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
>
> > Looks like the author didn't setup any attributes as templated...
> > https://github.com/apache/incubator-airflow/blob/
> > ff45d8f2218a8da9328161aa66d004c3db3b367e/airflow/contrib/
> > operators/emr_create_job_flow_operator.py#L35
> >
> > You can do it easily with a simple hack:
> >
> > class TemplatedEmrCreateJobFlowOperator(EmrCreateJobFlowOperator):
> >     template_fields = ['job_flow_overrides']
> >
> > Then you just use that operator instead and templating should work for
> the
> > values in job_flow_overrides.
> >
> > It'd be nice if you also took the time to send a PR adding that line.
> >
> > Max
> >
> > On Tue, Jun 20, 2017 at 1:58 AM, Vincent Poulain <
> > vincent.poulain@tinyclues.com> wrote:
> >
> > > Hello,
> > >
> > > I would like to use EmrCreateJobFlowOperator, using job_flow_overrides
> > > attribute with dynamic attribute.
> > >
> > > Example
> > >
> > > EmrCreateJobFlowOperator(job_flow_overrides ={'foo' : '{{ xcom.pull
> > }}'})
> > >
> > > job_flow_overrides seems very nested to use jinja templating and
> > > template_fields
> > > is set to [].
> > >
> > >  Any idea to fetch data from xcom and using it in job_flow_overrides
> > params
> > > ?
> > >
> > > Thank
> > >
> > >
> > > --
> > >
> > > *Vincent Poulain*
> > >
> > > Senior Software Engineer
> > >
> > >
> > >
> > > Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6
> 21
> > > 82
> > > 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
> > >
> > > Tinyclues | 51 rue Étienne Marcel, 75001 Paris
> > >
> > > www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> > > <https://twitter.com/Tinyclues>
> > >
> >
>
>
>
> --
>
> *Vincent Poulain*
>
> Senior Software Engineer
>
>
>
> Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21
> 82
> 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
>
> Tinyclues | 51 rue Étienne Marcel, 75001 Paris
>
> www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> <https://twitter.com/Tinyclues>
>

Re: EMROperator templating

Posted by Vincent Poulain <vi...@tinyclues.com>.
I already tried that, does not work. Seems that attribute
job_flow_overrides
http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Client.run_job_flow
I am not sure that jinja template can handle something like :

EmrCreateJobFlowOperator(job_flow_overrides={
...
'Steps': [
               {
                    "Name": "Setup Hadoop Debugging",
                    "ActionOnFailure": "TERMINATE_CLUSTER",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args": [
                            "state-pusher-script"
                        ]
                    }
               },
               {
                    "Name": "tinyclues-foo",
                    "ActionOnFailure": "TERMINATE_CLUSTER",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args": [
                            "spark-submit",
                            "--class",
                            "com.tinyclues.alwayson.batch.api.Batch",
                            '--conf',
*                            '-Dconfigs.RUN_ID={{
ti.xcom_pull("FIRST_TASK")["RUN_ID"] }}',*

"/tmp/tinyclues/{jar_name}".format(jar_name=jar_name)
                        ]
                    }
               }
           ]}
...

If it should work, I might miss something.
I am in to write this easy PR .

Thanks

On Tue, Jun 20, 2017 at 9:59 PM, Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> Looks like the author didn't setup any attributes as templated...
> https://github.com/apache/incubator-airflow/blob/
> ff45d8f2218a8da9328161aa66d004c3db3b367e/airflow/contrib/
> operators/emr_create_job_flow_operator.py#L35
>
> You can do it easily with a simple hack:
>
> class TemplatedEmrCreateJobFlowOperator(EmrCreateJobFlowOperator):
>     template_fields = ['job_flow_overrides']
>
> Then you just use that operator instead and templating should work for the
> values in job_flow_overrides.
>
> It'd be nice if you also took the time to send a PR adding that line.
>
> Max
>
> On Tue, Jun 20, 2017 at 1:58 AM, Vincent Poulain <
> vincent.poulain@tinyclues.com> wrote:
>
> > Hello,
> >
> > I would like to use EmrCreateJobFlowOperator, using job_flow_overrides
> > attribute with dynamic attribute.
> >
> > Example
> >
> > EmrCreateJobFlowOperator(job_flow_overrides ={'foo' : '{{ xcom.pull
> }}'})
> >
> > job_flow_overrides seems very nested to use jinja templating and
> > template_fields
> > is set to [].
> >
> >  Any idea to fetch data from xcom and using it in job_flow_overrides
> params
> > ?
> >
> > Thank
> >
> >
> > --
> >
> > *Vincent Poulain*
> >
> > Senior Software Engineer
> >
> >
> >
> > Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21
> > 82
> > 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
> >
> > Tinyclues | 51 rue Étienne Marcel, 75001 Paris
> >
> > www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> > <https://twitter.com/Tinyclues>
> >
>



-- 

*Vincent Poulain*

Senior Software Engineer



Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21 82
87 62 | vincent@tinyclues.com <su...@tinyclues.com>

Tinyclues | 51 rue Étienne Marcel, 75001 Paris

www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
<https://twitter.com/Tinyclues>

Re: EMROperator templating

Posted by Maxime Beauchemin <ma...@gmail.com>.
Looks like the author didn't setup any attributes as templated...
https://github.com/apache/incubator-airflow/blob/ff45d8f2218a8da9328161aa66d004c3db3b367e/airflow/contrib/operators/emr_create_job_flow_operator.py#L35

You can do it easily with a simple hack:

class TemplatedEmrCreateJobFlowOperator(EmrCreateJobFlowOperator):
    template_fields = ['job_flow_overrides']

Then you just use that operator instead and templating should work for the
values in job_flow_overrides.

It'd be nice if you also took the time to send a PR adding that line.

Max

On Tue, Jun 20, 2017 at 1:58 AM, Vincent Poulain <
vincent.poulain@tinyclues.com> wrote:

> Hello,
>
> I would like to use EmrCreateJobFlowOperator, using job_flow_overrides
> attribute with dynamic attribute.
>
> Example
>
> EmrCreateJobFlowOperator(job_flow_overrides ={'foo' : '{{ xcom.pull }}'})
>
> job_flow_overrides seems very nested to use jinja templating and
> template_fields
> is set to [].
>
>  Any idea to fetch data from xcom and using it in job_flow_overrides params
> ?
>
> Thank
>
>
> --
>
> *Vincent Poulain*
>
> Senior Software Engineer
>
>
>
> Office +33 1 75 50 67 26 <+33%201%2075%2050%2067%2026> | Mobile +33 6 21
> 82
> 87 62 | vincent@tinyclues.com <su...@tinyclues.com>
>
> Tinyclues | 51 rue Étienne Marcel, 75001 Paris
>
> www.tinyclues.com <http://bit.ly/2hNL4Fs> | @tinyclues
> <https://twitter.com/Tinyclues>
>