You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Daniel Cohen <da...@cloudinary.com> on 2018/09/26 19:27:29 UTC

hooks & operators improvement proposal

Some thoughts about operators / hooks:
Operators are composable,  typical ETL flow  looks like `kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
tasks use shared state (like s3) or naming conventions to continue work
where upstream task left off.

hooks on the other hand are not composable and a lot of ETL logic is
written ad hoc in the operator each time.

i propose a lightweight, in process, ETL framework that allows
- hook composition
- shared general utilities (compression  / file management / serialization)
- simplifies operator building

how it looks from the operator's side
def execute(self, context):
        # initialize hooks
        self.s3 = S3Hook...
        self.mysql = MySqlHook...

        # setup operator state
        query = 'select * from somewhere'

        # declare your ETL process
        self.mysql.yield_query(query) >> \
        pipes.clear_keys(keys=self.scrubbed_columns) >> \
        pipes.ndjson_dumps() >> \
        pipes.batch(size=1024) >> \
        pipes.gzip() >> \
        pipes.tempfile() >> \
        self.s3.file_writer(s3_key=self.s3_key,
                                bucket_name=self.s3_bucket,
                                replace=True)


how it looks from the hook's side

@pipes.producer # decorate
def yield_query(self, query):
        cursor.execute(query)
        for row in cursor:
            yield row


*pipes is a module with a set of operations that are generic and
potentially reused between hooks / operators

the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl packsges)
and implementation based on on generators and  decorators.

we (cloudinary.com) are planning to open source it , is it something that
would be interesting to integrate into airflow ,or as a 3rd party  ? or not
at all ? any thoughts suggestions ?

thanks ,
d


-- 
daniel cohen
+972-(0)54-4799-147

Re: hooks & operators improvement proposal

Posted by Michael Ghen <mi...@mikeghen.com>.
Right I see where you're at, I looked through some of our operators and I
do see some logic that's repeated (converting to JSON and writing to temp
files). We also do what looks like pipes.clear_keys with some of the files
we get using the SSHHook. Now that I notice that, I'm tempted to pull it
and put it with some of our other utils which is what it looks like pipes
is doing. I think if some common things like that existed, we'd consider
using them. I know we burned a lot of time getting tempfiles working right
since we're not all experienced software engineers over here.

On Thu, Sep 27, 2018 at 1:48 PM Daniel Cohen <da...@cloudinary.com>
wrote:

> Hi Michael, thank you for your comment.
> XCom is for sharing state between tasks , and you are right in stating that
> it won't be wise to pass datasets via it.
> I'm suggesting to refactor already exists code in operators (that  each
> operator implemented separately) . if we move some logic to hooks (or other
> construct) we can build more robust operators faster.
>
> 10x
> d
>
> On Thu, Sep 27, 2018 at 4:54 PM Michael Ghen <mi...@mikeghen.com> wrote:
>
> > I see what your looking for and I think this is the purpose of XCom.
> We've
> > used  xcom in some of our custom operators to get this type of
> > functionality.
> >
> > Though, we tend to avoid putting a lot of data into xcom, I believe
> > somewhere in the docs it talks about how that's an anti pattern. The
> > pattern was to lean on external systems for exchanging data.
> >
> > On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne <jp...@bombora.com> wrote:
> >
> > > Ah, OK. Thanks for the clarification.
> > >
> > > Get Outlook for Android<https://aka.ms/ghei36>
> > >
> > > ________________________________
> > > From: Daniel Cohen <da...@cloudinary.com>
> > > Sent: Wednesday, September 26, 2018 1:15:16 PM
> > > To: dev@airflow.incubator.apache.org
> > > Subject: Re: hooks & operators improvement proposal
> > >
> > > Hi Jeff,
> > > seems that I was a bit unclear
> > > The DAG ETL spans across multiple tasks. and usually looks like kickoff
> > >>
> > > source_to_staging >> staging_to_warehouse >> warehouse_post_process.
> > > I'm not proposing changes to operators they are gr8 , what i am
> proposing
> > > is to borrow the same concept to the smaller building blocks.
> > >
> > > I argue that the task anatomy (in ETL flows) is usually comprised of
> > > 'mini' flows that usually looks like read source > serialize > dump
> > > (example
> > > 1
> > > <
> > >
> >
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124
> > > >
> > > , example 2
> > > <
> > >
> >
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201
> > > >)
> > > .   you can see that sometimes its written in the operator and
> sometimes
> > in
> > > the hook , the code is not shared and handles same cases each time.
> > >
> > > thanks,
> > > d
> > >
> > >
> > >
> > > On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jp...@bombora.com>
> wrote:
> > >
> > > > So, in your scenario, the ETL pipeline happens inside the single
> > > > operator/task?
> > > >
> > > > If so, would it not make sense for the pipeline to span multiple
> tasks
> > > and
> > > > provide a standard set of functions/decorators/etc for defining the
> > > > input/output to/from each task? That way you would leverage the
> ability
> > > to
> > > > rerun the DAG from a particular step of the ETL pipeline in case of a
> > > > recoverable failure. Or am I misunderstanding...
> > > >
> > > > Get Outlook for Android<https://aka.ms/ghei36>
> > > >
> > > > ________________________________
> > > > From: Daniel Cohen <da...@cloudinary.com>
> > > > Sent: Wednesday, September 26, 2018 12:27:29 PM
> > > > To: dev@airflow.apache.org
> > > > Subject: hooks & operators improvement proposal
> > > >
> > > > Some thoughts about operators / hooks:
> > > > Operators are composable,  typical ETL flow  looks like `kickoff >>
> > > > source_to_staging >> staging_to_warehouse >> warehouse_post_process`
> > > where
> > > > tasks use shared state (like s3) or naming conventions to continue
> work
> > > > where upstream task left off.
> > > >
> > > > hooks on the other hand are not composable and a lot of ETL logic is
> > > > written ad hoc in the operator each time.
> > > >
> > > > i propose a lightweight, in process, ETL framework that allows
> > > > - hook composition
> > > > - shared general utilities (compression  / file management /
> > > serialization)
> > > > - simplifies operator building
> > > >
> > > > how it looks from the operator's side
> > > > def execute(self, context):
> > > >         # initialize hooks
> > > >         self.s3 = S3Hook...
> > > >         self.mysql = MySqlHook...
> > > >
> > > >         # setup operator state
> > > >         query = 'select * from somewhere'
> > > >
> > > >         # declare your ETL process
> > > >         self.mysql.yield_query(query) >> \
> > > >         pipes.clear_keys(keys=self.scrubbed_columns) >> \
> > > >         pipes.ndjson_dumps() >> \
> > > >         pipes.batch(size=1024) >> \
> > > >         pipes.gzip() >> \
> > > >         pipes.tempfile() >> \
> > > >         self.s3.file_writer(s3_key=self.s3_key,
> > > >                                 bucket_name=self.s3_bucket,
> > > >                                 replace=True)
> > > >
> > > >
> > > > how it looks from the hook's side
> > > >
> > > > @pipes.producer # decorate
> > > > def yield_query(self, query):
> > > >         cursor.execute(query)
> > > >         for row in cursor:
> > > >             yield row
> > > >
> > > >
> > > > *pipes is a module with a set of operations that are generic and
> > > > potentially reused between hooks / operators
> > > >
> > > > the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl
> > > packsges)
> > > > and implementation based on on generators and  decorators.
> > > >
> > > > we (cloudinary.com) are planning to open source it , is it something
> > > that
> > > > would be interesting to integrate into airflow ,or as a 3rd party  ?
> or
> > > not
> > > > at all ? any thoughts suggestions ?
> > > >
> > > > thanks ,
> > > > d
> > > >
> > > >
> > > > --
> > > > daniel cohen
> > > > +972-(0)54-4799-147
> > > >
> > >
> > >
> > > --
> > > daniel cohen
> > > +972-(0)54-4799-147
> > >
> >
>
>
> --
> daniel cohen
> +972-(0)54-4799-147
>

Re: hooks & operators improvement proposal

Posted by Daniel Cohen <da...@cloudinary.com>.
Hi Michael, thank you for your comment.
XCom is for sharing state between tasks , and you are right in stating that
it won't be wise to pass datasets via it.
I'm suggesting to refactor already exists code in operators (that  each
operator implemented separately) . if we move some logic to hooks (or other
construct) we can build more robust operators faster.

10x
d

On Thu, Sep 27, 2018 at 4:54 PM Michael Ghen <mi...@mikeghen.com> wrote:

> I see what your looking for and I think this is the purpose of XCom. We've
> used  xcom in some of our custom operators to get this type of
> functionality.
>
> Though, we tend to avoid putting a lot of data into xcom, I believe
> somewhere in the docs it talks about how that's an anti pattern. The
> pattern was to lean on external systems for exchanging data.
>
> On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne <jp...@bombora.com> wrote:
>
> > Ah, OK. Thanks for the clarification.
> >
> > Get Outlook for Android<https://aka.ms/ghei36>
> >
> > ________________________________
> > From: Daniel Cohen <da...@cloudinary.com>
> > Sent: Wednesday, September 26, 2018 1:15:16 PM
> > To: dev@airflow.incubator.apache.org
> > Subject: Re: hooks & operators improvement proposal
> >
> > Hi Jeff,
> > seems that I was a bit unclear
> > The DAG ETL spans across multiple tasks. and usually looks like kickoff
> >>
> > source_to_staging >> staging_to_warehouse >> warehouse_post_process.
> > I'm not proposing changes to operators they are gr8 , what i am proposing
> > is to borrow the same concept to the smaller building blocks.
> >
> > I argue that the task anatomy (in ETL flows) is usually comprised of
> > 'mini' flows that usually looks like read source > serialize > dump
> > (example
> > 1
> > <
> >
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124
> > >
> > , example 2
> > <
> >
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201
> > >)
> > .   you can see that sometimes its written in the operator and sometimes
> in
> > the hook , the code is not shared and handles same cases each time.
> >
> > thanks,
> > d
> >
> >
> >
> > On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jp...@bombora.com> wrote:
> >
> > > So, in your scenario, the ETL pipeline happens inside the single
> > > operator/task?
> > >
> > > If so, would it not make sense for the pipeline to span multiple tasks
> > and
> > > provide a standard set of functions/decorators/etc for defining the
> > > input/output to/from each task? That way you would leverage the ability
> > to
> > > rerun the DAG from a particular step of the ETL pipeline in case of a
> > > recoverable failure. Or am I misunderstanding...
> > >
> > > Get Outlook for Android<https://aka.ms/ghei36>
> > >
> > > ________________________________
> > > From: Daniel Cohen <da...@cloudinary.com>
> > > Sent: Wednesday, September 26, 2018 12:27:29 PM
> > > To: dev@airflow.apache.org
> > > Subject: hooks & operators improvement proposal
> > >
> > > Some thoughts about operators / hooks:
> > > Operators are composable,  typical ETL flow  looks like `kickoff >>
> > > source_to_staging >> staging_to_warehouse >> warehouse_post_process`
> > where
> > > tasks use shared state (like s3) or naming conventions to continue work
> > > where upstream task left off.
> > >
> > > hooks on the other hand are not composable and a lot of ETL logic is
> > > written ad hoc in the operator each time.
> > >
> > > i propose a lightweight, in process, ETL framework that allows
> > > - hook composition
> > > - shared general utilities (compression  / file management /
> > serialization)
> > > - simplifies operator building
> > >
> > > how it looks from the operator's side
> > > def execute(self, context):
> > >         # initialize hooks
> > >         self.s3 = S3Hook...
> > >         self.mysql = MySqlHook...
> > >
> > >         # setup operator state
> > >         query = 'select * from somewhere'
> > >
> > >         # declare your ETL process
> > >         self.mysql.yield_query(query) >> \
> > >         pipes.clear_keys(keys=self.scrubbed_columns) >> \
> > >         pipes.ndjson_dumps() >> \
> > >         pipes.batch(size=1024) >> \
> > >         pipes.gzip() >> \
> > >         pipes.tempfile() >> \
> > >         self.s3.file_writer(s3_key=self.s3_key,
> > >                                 bucket_name=self.s3_bucket,
> > >                                 replace=True)
> > >
> > >
> > > how it looks from the hook's side
> > >
> > > @pipes.producer # decorate
> > > def yield_query(self, query):
> > >         cursor.execute(query)
> > >         for row in cursor:
> > >             yield row
> > >
> > >
> > > *pipes is a module with a set of operations that are generic and
> > > potentially reused between hooks / operators
> > >
> > > the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl
> > packsges)
> > > and implementation based on on generators and  decorators.
> > >
> > > we (cloudinary.com) are planning to open source it , is it something
> > that
> > > would be interesting to integrate into airflow ,or as a 3rd party  ? or
> > not
> > > at all ? any thoughts suggestions ?
> > >
> > > thanks ,
> > > d
> > >
> > >
> > > --
> > > daniel cohen
> > > +972-(0)54-4799-147
> > >
> >
> >
> > --
> > daniel cohen
> > +972-(0)54-4799-147
> >
>


-- 
daniel cohen
+972-(0)54-4799-147

Re: hooks & operators improvement proposal

Posted by Michael Ghen <mi...@mikeghen.com>.
I see what your looking for and I think this is the purpose of XCom. We've
used  xcom in some of our custom operators to get this type of
functionality.

Though, we tend to avoid putting a lot of data into xcom, I believe
somewhere in the docs it talks about how that's an anti pattern. The
pattern was to lean on external systems for exchanging data.

On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne <jp...@bombora.com> wrote:

> Ah, OK. Thanks for the clarification.
>
> Get Outlook for Android<https://aka.ms/ghei36>
>
> ________________________________
> From: Daniel Cohen <da...@cloudinary.com>
> Sent: Wednesday, September 26, 2018 1:15:16 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: hooks & operators improvement proposal
>
> Hi Jeff,
> seems that I was a bit unclear
> The DAG ETL spans across multiple tasks. and usually looks like kickoff >>
> source_to_staging >> staging_to_warehouse >> warehouse_post_process.
> I'm not proposing changes to operators they are gr8 , what i am proposing
> is to borrow the same concept to the smaller building blocks.
>
> I argue that the task anatomy (in ETL flows) is usually comprised of
> 'mini' flows that usually looks like read source > serialize > dump
> (example
> 1
> <
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124
> >
> , example 2
> <
> https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201
> >)
> .   you can see that sometimes its written in the operator and sometimes in
> the hook , the code is not shared and handles same cases each time.
>
> thanks,
> d
>
>
>
> On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jp...@bombora.com> wrote:
>
> > So, in your scenario, the ETL pipeline happens inside the single
> > operator/task?
> >
> > If so, would it not make sense for the pipeline to span multiple tasks
> and
> > provide a standard set of functions/decorators/etc for defining the
> > input/output to/from each task? That way you would leverage the ability
> to
> > rerun the DAG from a particular step of the ETL pipeline in case of a
> > recoverable failure. Or am I misunderstanding...
> >
> > Get Outlook for Android<https://aka.ms/ghei36>
> >
> > ________________________________
> > From: Daniel Cohen <da...@cloudinary.com>
> > Sent: Wednesday, September 26, 2018 12:27:29 PM
> > To: dev@airflow.apache.org
> > Subject: hooks & operators improvement proposal
> >
> > Some thoughts about operators / hooks:
> > Operators are composable,  typical ETL flow  looks like `kickoff >>
> > source_to_staging >> staging_to_warehouse >> warehouse_post_process`
> where
> > tasks use shared state (like s3) or naming conventions to continue work
> > where upstream task left off.
> >
> > hooks on the other hand are not composable and a lot of ETL logic is
> > written ad hoc in the operator each time.
> >
> > i propose a lightweight, in process, ETL framework that allows
> > - hook composition
> > - shared general utilities (compression  / file management /
> serialization)
> > - simplifies operator building
> >
> > how it looks from the operator's side
> > def execute(self, context):
> >         # initialize hooks
> >         self.s3 = S3Hook...
> >         self.mysql = MySqlHook...
> >
> >         # setup operator state
> >         query = 'select * from somewhere'
> >
> >         # declare your ETL process
> >         self.mysql.yield_query(query) >> \
> >         pipes.clear_keys(keys=self.scrubbed_columns) >> \
> >         pipes.ndjson_dumps() >> \
> >         pipes.batch(size=1024) >> \
> >         pipes.gzip() >> \
> >         pipes.tempfile() >> \
> >         self.s3.file_writer(s3_key=self.s3_key,
> >                                 bucket_name=self.s3_bucket,
> >                                 replace=True)
> >
> >
> > how it looks from the hook's side
> >
> > @pipes.producer # decorate
> > def yield_query(self, query):
> >         cursor.execute(query)
> >         for row in cursor:
> >             yield row
> >
> >
> > *pipes is a module with a set of operations that are generic and
> > potentially reused between hooks / operators
> >
> > the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl
> packsges)
> > and implementation based on on generators and  decorators.
> >
> > we (cloudinary.com) are planning to open source it , is it something
> that
> > would be interesting to integrate into airflow ,or as a 3rd party  ? or
> not
> > at all ? any thoughts suggestions ?
> >
> > thanks ,
> > d
> >
> >
> > --
> > daniel cohen
> > +972-(0)54-4799-147
> >
>
>
> --
> daniel cohen
> +972-(0)54-4799-147
>

Re: hooks & operators improvement proposal

Posted by Jeff Payne <jp...@bombora.com>.
Ah, OK. Thanks for the clarification.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Daniel Cohen <da...@cloudinary.com>
Sent: Wednesday, September 26, 2018 1:15:16 PM
To: dev@airflow.incubator.apache.org
Subject: Re: hooks & operators improvement proposal

Hi Jeff,
seems that I was a bit unclear
The DAG ETL spans across multiple tasks. and usually looks like kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process.
I'm not proposing changes to operators they are gr8 , what i am proposing
is to borrow the same concept to the smaller building blocks.

I argue that the task anatomy (in ETL flows) is usually comprised of
'mini' flows that usually looks like read source > serialize > dump (example
1
<https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124>
, example 2
<https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201>)
.   you can see that sometimes its written in the operator and sometimes in
the hook , the code is not shared and handles same cases each time.

thanks,
d



On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jp...@bombora.com> wrote:

> So, in your scenario, the ETL pipeline happens inside the single
> operator/task?
>
> If so, would it not make sense for the pipeline to span multiple tasks and
> provide a standard set of functions/decorators/etc for defining the
> input/output to/from each task? That way you would leverage the ability to
> rerun the DAG from a particular step of the ETL pipeline in case of a
> recoverable failure. Or am I misunderstanding...
>
> Get Outlook for Android<https://aka.ms/ghei36>
>
> ________________________________
> From: Daniel Cohen <da...@cloudinary.com>
> Sent: Wednesday, September 26, 2018 12:27:29 PM
> To: dev@airflow.apache.org
> Subject: hooks & operators improvement proposal
>
> Some thoughts about operators / hooks:
> Operators are composable,  typical ETL flow  looks like `kickoff >>
> source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
> tasks use shared state (like s3) or naming conventions to continue work
> where upstream task left off.
>
> hooks on the other hand are not composable and a lot of ETL logic is
> written ad hoc in the operator each time.
>
> i propose a lightweight, in process, ETL framework that allows
> - hook composition
> - shared general utilities (compression  / file management / serialization)
> - simplifies operator building
>
> how it looks from the operator's side
> def execute(self, context):
>         # initialize hooks
>         self.s3 = S3Hook...
>         self.mysql = MySqlHook...
>
>         # setup operator state
>         query = 'select * from somewhere'
>
>         # declare your ETL process
>         self.mysql.yield_query(query) >> \
>         pipes.clear_keys(keys=self.scrubbed_columns) >> \
>         pipes.ndjson_dumps() >> \
>         pipes.batch(size=1024) >> \
>         pipes.gzip() >> \
>         pipes.tempfile() >> \
>         self.s3.file_writer(s3_key=self.s3_key,
>                                 bucket_name=self.s3_bucket,
>                                 replace=True)
>
>
> how it looks from the hook's side
>
> @pipes.producer # decorate
> def yield_query(self, query):
>         cursor.execute(query)
>         for row in cursor:
>             yield row
>
>
> *pipes is a module with a set of operations that are generic and
> potentially reused between hooks / operators
>
> the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl packsges)
> and implementation based on on generators and  decorators.
>
> we (cloudinary.com) are planning to open source it , is it something that
> would be interesting to integrate into airflow ,or as a 3rd party  ? or not
> at all ? any thoughts suggestions ?
>
> thanks ,
> d
>
>
> --
> daniel cohen
> +972-(0)54-4799-147
>


--
daniel cohen
+972-(0)54-4799-147

Re: hooks & operators improvement proposal

Posted by Daniel Cohen <da...@cloudinary.com>.
Hi Jeff,
seems that I was a bit unclear
The DAG ETL spans across multiple tasks. and usually looks like kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process.
I'm not proposing changes to operators they are gr8 , what i am proposing
is to borrow the same concept to the smaller building blocks.

I argue that the task anatomy (in ETL flows) is usually comprised of
'mini' flows that usually looks like read source > serialize > dump (example
1
<https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124>
, example 2
<https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201>)
.   you can see that sometimes its written in the operator and sometimes in
the hook , the code is not shared and handles same cases each time.

thanks,
d



On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jp...@bombora.com> wrote:

> So, in your scenario, the ETL pipeline happens inside the single
> operator/task?
>
> If so, would it not make sense for the pipeline to span multiple tasks and
> provide a standard set of functions/decorators/etc for defining the
> input/output to/from each task? That way you would leverage the ability to
> rerun the DAG from a particular step of the ETL pipeline in case of a
> recoverable failure. Or am I misunderstanding...
>
> Get Outlook for Android<https://aka.ms/ghei36>
>
> ________________________________
> From: Daniel Cohen <da...@cloudinary.com>
> Sent: Wednesday, September 26, 2018 12:27:29 PM
> To: dev@airflow.apache.org
> Subject: hooks & operators improvement proposal
>
> Some thoughts about operators / hooks:
> Operators are composable,  typical ETL flow  looks like `kickoff >>
> source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
> tasks use shared state (like s3) or naming conventions to continue work
> where upstream task left off.
>
> hooks on the other hand are not composable and a lot of ETL logic is
> written ad hoc in the operator each time.
>
> i propose a lightweight, in process, ETL framework that allows
> - hook composition
> - shared general utilities (compression  / file management / serialization)
> - simplifies operator building
>
> how it looks from the operator's side
> def execute(self, context):
>         # initialize hooks
>         self.s3 = S3Hook...
>         self.mysql = MySqlHook...
>
>         # setup operator state
>         query = 'select * from somewhere'
>
>         # declare your ETL process
>         self.mysql.yield_query(query) >> \
>         pipes.clear_keys(keys=self.scrubbed_columns) >> \
>         pipes.ndjson_dumps() >> \
>         pipes.batch(size=1024) >> \
>         pipes.gzip() >> \
>         pipes.tempfile() >> \
>         self.s3.file_writer(s3_key=self.s3_key,
>                                 bucket_name=self.s3_bucket,
>                                 replace=True)
>
>
> how it looks from the hook's side
>
> @pipes.producer # decorate
> def yield_query(self, query):
>         cursor.execute(query)
>         for row in cursor:
>             yield row
>
>
> *pipes is a module with a set of operations that are generic and
> potentially reused between hooks / operators
>
> the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl packsges)
> and implementation based on on generators and  decorators.
>
> we (cloudinary.com) are planning to open source it , is it something that
> would be interesting to integrate into airflow ,or as a 3rd party  ? or not
> at all ? any thoughts suggestions ?
>
> thanks ,
> d
>
>
> --
> daniel cohen
> +972-(0)54-4799-147
>


-- 
daniel cohen
+972-(0)54-4799-147

Re: hooks & operators improvement proposal

Posted by Jeff Payne <jp...@bombora.com>.
So, in your scenario, the ETL pipeline happens inside the single operator/task?

If so, would it not make sense for the pipeline to span multiple tasks and provide a standard set of functions/decorators/etc for defining the input/output to/from each task? That way you would leverage the ability to rerun the DAG from a particular step of the ETL pipeline in case of a recoverable failure. Or am I misunderstanding...

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Daniel Cohen <da...@cloudinary.com>
Sent: Wednesday, September 26, 2018 12:27:29 PM
To: dev@airflow.apache.org
Subject: hooks & operators improvement proposal

Some thoughts about operators / hooks:
Operators are composable,  typical ETL flow  looks like `kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
tasks use shared state (like s3) or naming conventions to continue work
where upstream task left off.

hooks on the other hand are not composable and a lot of ETL logic is
written ad hoc in the operator each time.

i propose a lightweight, in process, ETL framework that allows
- hook composition
- shared general utilities (compression  / file management / serialization)
- simplifies operator building

how it looks from the operator's side
def execute(self, context):
        # initialize hooks
        self.s3 = S3Hook...
        self.mysql = MySqlHook...

        # setup operator state
        query = 'select * from somewhere'

        # declare your ETL process
        self.mysql.yield_query(query) >> \
        pipes.clear_keys(keys=self.scrubbed_columns) >> \
        pipes.ndjson_dumps() >> \
        pipes.batch(size=1024) >> \
        pipes.gzip() >> \
        pipes.tempfile() >> \
        self.s3.file_writer(s3_key=self.s3_key,
                                bucket_name=self.s3_bucket,
                                replace=True)


how it looks from the hook's side

@pipes.producer # decorate
def yield_query(self, query):
        cursor.execute(query)
        for row in cursor:
            yield row


*pipes is a module with a set of operations that are generic and
potentially reused between hooks / operators

the idea inspired by 'bonobo'  and 'python-pipes'  (lightwait etl packsges)
and implementation based on on generators and  decorators.

we (cloudinary.com) are planning to open source it , is it something that
would be interesting to integrate into airflow ,or as a 3rd party  ? or not
at all ? any thoughts suggestions ?

thanks ,
d


--
daniel cohen
+972-(0)54-4799-147