You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bolke de Bruin <bd...@gmail.com> on 2018/05/05 21:49:47 UTC

Lineage

Hi All,

I have made a first implementation that allows tracking of lineage in Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s work in the past on Data Flow pipelines, but I think I kept it a little bit simpler. 

Operators now have two new parameters called “inlets” and “outlets”. These can be filled with objects derived from “DataSet”, like “File” and “HadoopFile”. Parameters are jinja2 templated, which
means they receive the context of the task when it is running and get rendered. So you can get definitions like this:

f_final = File(name="/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File(name="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(    
    task_id='run_after_loop', bash_command='echo 1', dag=dag,
    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)

So I am trying to keep to boilerplate work down for developers. Operators can also extend inlets and outlets automatically. This will probably be a bit harder for the BashOperator without some special magic, but an update to the DruidOperator can be relatively quite straightforward.

In the future Operators can take advantage of the inlet/outlet definitions as they are also made available as part of the context for templating (as “inlets” and “outlets”).

I’m looking forward to your comments!

https://github.com/apache/incubator-airflow/pull/3321

Bolke.

Re: Lineage

Posted by Bolke de Bruin <bd...@gmail.com>.
Hi Marcin,

That would be awesome! The reason I chose to use DataSets is because it aligns easily with Apache Atlas’ understanding of what a dataset is. I had no other example apart from IBM’s Infosphere which I really do not like to get into. So I am definitely open for changes.

Another thing is obviously that many of the Operators are not SQL based or even SQL like, so I wonder how your worked with that. Being able to automatically derive metadata is of course the holy grail.

Finally what I also tried to accomplish is to have an easy way to share metadata between tasks. This requires being able to serialize the metadata to json as it is being shared by XCom.

So yes please share! We can generalize the best practices then!

Bolke

Verstuurd vanaf mijn iPad

> Op 6 mei 2018 om 00:37 heeft Marcin Szymański <ms...@gmail.com> het volgende geschreven:
> 
> Hi Bolke
> 
> Great stuff. Pieces of this this remind work I have done for one
> organization. However in that case, instead of defining base classes like
> Dataset form scratch, I extended objects from SQLAlchemy, such as Metadata,
> Table, etc. This in turn allowed for automated SQL generation (with some
> changes to operators), defining data quality checks and many other cool
> things. Maybe it's worth going down that path? I am willing to share more
> details, if interested.
> 
> Best
> Marcin
> 
>> On Sat, May 5, 2018, 22:49 Bolke de Bruin <bd...@gmail.com> wrote:
>> 
>> Hi All,
>> 
>> I have made a first implementation that allows tracking of lineage in
>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>> simpler.
>> 
>> Operators now have two new parameters called “inlets” and “outlets”. These
>> can be filled with objects derived from “DataSet”, like “File” and
>> “HadoopFile”. Parameters are jinja2 templated, which
>> means they receive the context of the task when it is running and get
>> rendered. So you can get definitions like this:
>> 
>> f_final = File(name="/tmp/final")
>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>    inlets={"auto": True},
>>    outlets={"datasets": [f_final,]})
>> 
>> f_in = File(name="/tmp/whole_directory/")
>> outlets = []
>> for file in FILE_CATEGORIES:
>>    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>    outlets.append(f_out)
>> run_this = BashOperator(
>>    task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>    outlets={"datasets": outlets}
>>    )
>> run_this.set_downstream(run_this_last)
>> 
>> So I am trying to keep to boilerplate work down for developers. Operators
>> can also extend inlets and outlets automatically. This will probably be a
>> bit harder for the BashOperator without some special magic, but an update
>> to the DruidOperator can be relatively quite straightforward.
>> 
>> In the future Operators can take advantage of the inlet/outlet definitions
>> as they are also made available as part of the context for templating (as
>> “inlets” and “outlets”).
>> 
>> I’m looking forward to your comments!
>> 
>> https://github.com/apache/incubator-airflow/pull/3321
>> 
>> Bolke.

Re: Lineage

Posted by Alex Tronchin-James 949-412-7220 <al...@gmail.com>.
^this

On Sat, May 5, 2018, 15:37 Marcin Szymański <ms...@gmail.com> wrote:

> Hi Bolke
>
> Great stuff. Pieces of this this remind work I have done for one
> organization. However in that case, instead of defining base classes like
> Dataset form scratch, I extended objects from SQLAlchemy, such as Metadata,
> Table, etc. This in turn allowed for automated SQL generation (with some
> changes to operators), defining data quality checks and many other cool
> things. Maybe it's worth going down that path? I am willing to share more
> details, if interested.
>
> Best
> Marcin
>
> On Sat, May 5, 2018, 22:49 Bolke de Bruin <bd...@gmail.com> wrote:
>
> > Hi All,
> >
> > I have made a first implementation that allows tracking of lineage in
> > Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
> > work in the past on Data Flow pipelines, but I think I kept it a little
> bit
> > simpler.
> >
> > Operators now have two new parameters called “inlets” and “outlets”.
> These
> > can be filled with objects derived from “DataSet”, like “File” and
> > “HadoopFile”. Parameters are jinja2 templated, which
> > means they receive the context of the task when it is running and get
> > rendered. So you can get definitions like this:
> >
> > f_final = File(name="/tmp/final")
> > run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
> >     inlets={"auto": True},
> >     outlets={"datasets": [f_final,]})
> >
> > f_in = File(name="/tmp/whole_directory/")
> > outlets = []
> > for file in FILE_CATEGORIES:
> >     f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
> >     outlets.append(f_out)
> > run_this = BashOperator(
> >     task_id='run_after_loop', bash_command='echo 1', dag=dag,
> >     inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
> >     outlets={"datasets": outlets}
> >     )
> > run_this.set_downstream(run_this_last)
> >
> > So I am trying to keep to boilerplate work down for developers. Operators
> > can also extend inlets and outlets automatically. This will probably be a
> > bit harder for the BashOperator without some special magic, but an update
> > to the DruidOperator can be relatively quite straightforward.
> >
> > In the future Operators can take advantage of the inlet/outlet
> definitions
> > as they are also made available as part of the context for templating (as
> > “inlets” and “outlets”).
> >
> > I’m looking forward to your comments!
> >
> > https://github.com/apache/incubator-airflow/pull/3321
> >
> > Bolke.
>

Re: Lineage

Posted by Marcin Szymański <ms...@gmail.com>.
Hi Bolke

Great stuff. Pieces of this this remind work I have done for one
organization. However in that case, instead of defining base classes like
Dataset form scratch, I extended objects from SQLAlchemy, such as Metadata,
Table, etc. This in turn allowed for automated SQL generation (with some
changes to operators), defining data quality checks and many other cool
things. Maybe it's worth going down that path? I am willing to share more
details, if interested.

Best
Marcin

On Sat, May 5, 2018, 22:49 Bolke de Bruin <bd...@gmail.com> wrote:

> Hi All,
>
> I have made a first implementation that allows tracking of lineage in
> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
> work in the past on Data Flow pipelines, but I think I kept it a little bit
> simpler.
>
> Operators now have two new parameters called “inlets” and “outlets”. These
> can be filled with objects derived from “DataSet”, like “File” and
> “HadoopFile”. Parameters are jinja2 templated, which
> means they receive the context of the task when it is running and get
> rendered. So you can get definitions like this:
>
> f_final = File(name="/tmp/final")
> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>     inlets={"auto": True},
>     outlets={"datasets": [f_final,]})
>
> f_in = File(name="/tmp/whole_directory/")
> outlets = []
> for file in FILE_CATEGORIES:
>     f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>     outlets.append(f_out)
> run_this = BashOperator(
>     task_id='run_after_loop', bash_command='echo 1', dag=dag,
>     inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>     outlets={"datasets": outlets}
>     )
> run_this.set_downstream(run_this_last)
>
> So I am trying to keep to boilerplate work down for developers. Operators
> can also extend inlets and outlets automatically. This will probably be a
> bit harder for the BashOperator without some special magic, but an update
> to the DruidOperator can be relatively quite straightforward.
>
> In the future Operators can take advantage of the inlet/outlet definitions
> as they are also made available as part of the context for templating (as
> “inlets” and “outlets”).
>
> I’m looking forward to your comments!
>
> https://github.com/apache/incubator-airflow/pull/3321
>
> Bolke.

Re: Lineage

Posted by Bolke de Bruin <bd...@gmail.com>.
Hi Jørn,

Great that you are test driving it. It does not have an affect if inlets and outlets are not used thus although experimental it will end up in 1.10. It does need more iterations as some very valuable points have been made and I am working to see how those can get integrated properly. If possible I would like to move to the open meta data structure that will be in atlas 1.0 and allows easier interoperability with other systems.

If you have suggestions please let me now! I will do a presentation at PyData Amsterdam together with Fokko that will feature some of the new things we are thinking of (and shamelessly copying some of Max ideas, hope you don’t mind Max!).

Bolke

Verstuurd vanaf mijn iPad

> Op 14 mei 2018 om 18:19 heeft Jørn A Hansen <jo...@gmail.com> het volgende geschreven:
> 
>> On Sat, 5 May 2018 at 23.49, Bolke de Bruin <bd...@gmail.com> wrote:
>> 
>> Hi All,
>> I have made a first implementation that allows tracking of lineage in
>> Airflow and integration with Apache Atlas.
> 
> 
> Snip
> 
>> 
>> 
>> I’m looking forward to your comments!
>> 
>> https://github.com/apache/incubator-airflow/pull/3321
> 
> 
> I see this just got merged - looking forward to be able to kick the tires
> more on this. When I tried getting started setting this up I stumbled over
> Bolke’s nice Atlas Dockerfile (on his GitHub account. Bolke, thanks for
> sharing) it quickly helped me getting to a running Atlas instance which I
> was able to explore a bit ... and then I ran out of time.
> 
> Just one quick question: Will this go into 1.10 or is the general feeling
> that it has to get a bit more mileage on master first?
> 
> Cheers,
> JornH

Re: Lineage

Posted by Jørn A Hansen <jo...@gmail.com>.
On Sat, 5 May 2018 at 23.49, Bolke de Bruin <bd...@gmail.com> wrote:

> Hi All,
> I have made a first implementation that allows tracking of lineage in
> Airflow and integration with Apache Atlas.


Snip

>
>
> I’m looking forward to your comments!
>
> https://github.com/apache/incubator-airflow/pull/3321


I see this just got merged - looking forward to be able to kick the tires
more on this. When I tried getting started setting this up I stumbled over
Bolke’s nice Atlas Dockerfile (on his GitHub account. Bolke, thanks for
sharing) it quickly helped me getting to a running Atlas instance which I
was able to explore a bit ... and then I ran out of time.

Just one quick question: Will this go into 1.10 or is the general feeling
that it has to get a bit more mileage on master first?

Cheers,
JornH

Re: Lineage

Posted by Marcin Szymański <ms...@gmail.com>.
Ok, I’ve put the most interesting pieces into a small Gist:

https://gist.github.com/ms32035/f85cfbaff132f0d0ec8c309558330b7d

The solution is based on SQLAlchemy’s declarative layer, which I found to
be closest to what you can find in a metadata repository in commercial ETL
tools. Luckily, most of the parameters there are optional, and it works
equally well without a database connection, so it can be used to store
information about other objects, like files.

The starting point is MetaData class (
http://docs.sqlalchemy.org/en/latest/core/metadata.html), which has been
extended to store Airflow connection ID. I treat it as an equivalent of a
single database, file location or anything else that stores a collection of
input/output datasets.

Next comes an extension of Table. On top of standard SQLAlchemy objects
like columns, datatypes, constraints (which don’t need to exist in the
database, and are processed by a dedicated operators), it stores additional
attributes such as paths, encoding etc. The trick here was to plug that
into the base object, which has a bit complex initialization, hence the
__new__ method is overwritten. Since almost everything, including column
definitions, is optional, the class can also represent files or anything
else that has a structure. This is an equivalent of what you defined as a
DataSet. Because a table is linked to metadata, connection ids are not
needed any more in operators.

There are two other classes, which are not related directly to your PR, but
leverage the framework and are commonly used in ETL/DQ:

·         HighWaterMark – using SQLAlchemy columns, automatically stores
HWM values for incremental loading in Airflow variables

·         SQLCheck – allows defining complex SQL/Python data verification
rules in metadata, which in the end come down to comparing 2 expressions

The metadata repository is simply a set of python files, with some
directory structure, stored and versioned together with dags.

You are right that not all operators are sql based, so not all of them were
rewritten. For those rewritten, I used the following logic/pattern:

·         2 parameters (input, output)

·         Output was usually a table object

·         Input could be (tried in order):

o   extraction SQL query saved as table attribute, stored in metadata

o   Table object – sql was automatically generated from metadata

o   SQL query as a string

It does not cover all sorts of complex transformations, but with tables
defined in metadata it’s much easier to track what’s used where.
Furthermore, the metadata definitions were equally useful for flat files,
for example to store header columns.

I didn’t implement any metadata exchange between the operators, so I can’t
comment on serialization.

Just a few final thoughts on what works good and what not so good:

+ all metadata in one place

+ support for basic data quality

- defining metadata (columns) without a wizard like in commercial tools is
time consuming

- the table class has become a bit cluttered due to a lot of various SQL
query types implemented, and will probably need some restructuring (split)
at some time



Best

Marcin



On Sun, May 6, 2018 at 10:15 AM Bolke de Bruin <bd...@gmail.com> wrote:

> Forgot to answer your question for S3 it could look like:
>
> s3_file = File("s3a://bucket/key")
> Inlets = {"datasets:" [s3_file,]}
>
> Obviously if you do something with the s3 file outside of Airflow you need
> to track lineage yourself somehow.
>
> B.
>
> Sent from my iPhone
>
> > On 6 May 2018, at 11:05, Bolke de Bruin <bd...@gmail.com> wrote:
> >
> > Hi Gerardo,
> >
> > Any lineage tracking system is dependent on how much data you can give
> it. So if you do transfers outside of the 'view' such a system has then
> lineage information is gone. Airflow can help in this area by tracking its
> internal lineage and providing that to those lineage systems.
> >
> > Apache Atlas is agnostic and can receive lineage info by rest API (used
> in my implementation) and Kafk topic. It does also come with a lot of
> connectors out of the box that tie into the hadoop ecosystem and make your
> live easier there. The Airflow Atlas connector supplies Atlas with
> information that it doesn't know about yet closing the loop further.
> >
> > Also you can write your own connector and put it on the Airflow class
> path and use that one.
> >
> > Bolke
> >
> > Sent from my iPhone
> >
> >> On 6 May 2018, at 09:13, Gerardo Curiel <ge...@gerar.do> wrote:
> >>
> >> Hi Bolke,
> >>
> >> Data lineage support sounds very interesting.
> >>
> >> I'm not very familiar with Atlas but first sight seems like a tool
> specific
> >> to the Hadoop ecosystem. How would this look like if the files (inlets
> or
> >> outlets) were stored on s3?.
> >>
> >> An example of a service that manages a similar use case is AWS Glue[1],
> >> which creates a hive metastore based on the schema and other metadata it
> >> can get from different sources (amongst them, s3 files).
> >>
> >>
> >>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bd...@gmail.com>
> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I have made a first implementation that allows tracking of lineage in
> >>> Airflow and integration with Apache Atlas. It was inspired by
> Jeremiah’s
> >>> work in the past on Data Flow pipelines, but I think I kept it a
> little bit
> >>> simpler.
> >>>
> >>> Operators now have two new parameters called “inlets” and “outlets”.
> These
> >>> can be filled with objects derived from “DataSet”, like “File” and
> >>> “HadoopFile”. Parameters are jinja2 templated, which
> >>> means they receive the context of the task when it is running and get
> >>> rendered. So you can get definitions like this:
> >>>
> >>> f_final = File(name="/tmp/final")
> >>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
> >>>   inlets={"auto": True},
> >>>   outlets={"datasets": [f_final,]})
> >>>
> >>> f_in = File(name="/tmp/whole_directory/")
> >>> outlets = []
> >>> for file in FILE_CATEGORIES:
> >>>   f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
> >>>   outlets.append(f_out)
> >>> run_this = BashOperator(
> >>>   task_id='run_after_loop', bash_command='echo 1', dag=dag,
> >>>   inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
> >>>   outlets={"datasets": outlets}
> >>>   )
> >>> run_this.set_downstream(run_this_last)
> >>>
> >>> So I am trying to keep to boilerplate work down for developers.
> Operators
> >>> can also extend inlets and outlets automatically. This will probably
> be a
> >>> bit harder for the BashOperator without some special magic, but an
> update
> >>> to the DruidOperator can be relatively quite straightforward.
> >>>
> >>> In the future Operators can take advantage of the inlet/outlet
> definitions
> >>> as they are also made available as part of the context for templating
> (as
> >>> “inlets” and “outlets”).
> >>>
> >>> I’m looking forward to your comments!
> >>>
> >>> https://github.com/apache/incubator-airflow/pull/3321
> >>>
> >>> Bolke.
> >>
> >>
> >>
> >> [1] https://aws.amazon.com/glue/
> >>
> >> Cheers,
> >>
> >> --
> >> Gerardo Curiel // https://gerar.do
>

Re: Lineage

Posted by Bolke de Bruin <bd...@gmail.com>.
Forgot to answer your question for S3 it could look like:

s3_file = File("s3a://bucket/key")
Inlets = {"datasets:" [s3_file,]}

Obviously if you do something with the s3 file outside of Airflow you need to track lineage yourself somehow. 

B.

Sent from my iPhone

> On 6 May 2018, at 11:05, Bolke de Bruin <bd...@gmail.com> wrote:
> 
> Hi Gerardo,
> 
> Any lineage tracking system is dependent on how much data you can give it. So if you do transfers outside of the 'view' such a system has then lineage information is gone. Airflow can help in this area by tracking its internal lineage and providing that to those lineage systems. 
> 
> Apache Atlas is agnostic and can receive lineage info by rest API (used in my implementation) and Kafk topic. It does also come with a lot of connectors out of the box that tie into the hadoop ecosystem and make your live easier there. The Airflow Atlas connector supplies Atlas with information that it doesn't know about yet closing the loop further. 
> 
> Also you can write your own connector and put it on the Airflow class path and use that one. 
> 
> Bolke
> 
> Sent from my iPhone
> 
>> On 6 May 2018, at 09:13, Gerardo Curiel <ge...@gerar.do> wrote:
>> 
>> Hi Bolke,
>> 
>> Data lineage support sounds very interesting.
>> 
>> I'm not very familiar with Atlas but first sight seems like a tool specific
>> to the Hadoop ecosystem. How would this look like if the files (inlets or
>> outlets) were stored on s3?.
>> 
>> An example of a service that manages a similar use case is AWS Glue[1],
>> which creates a hive metastore based on the schema and other metadata it
>> can get from different sources (amongst them, s3 files).
>> 
>> 
>>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bd...@gmail.com> wrote:
>>> 
>>> Hi All,
>>> 
>>> I have made a first implementation that allows tracking of lineage in
>>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>>> simpler.
>>> 
>>> Operators now have two new parameters called “inlets” and “outlets”. These
>>> can be filled with objects derived from “DataSet”, like “File” and
>>> “HadoopFile”. Parameters are jinja2 templated, which
>>> means they receive the context of the task when it is running and get
>>> rendered. So you can get definitions like this:
>>> 
>>> f_final = File(name="/tmp/final")
>>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>>   inlets={"auto": True},
>>>   outlets={"datasets": [f_final,]})
>>> 
>>> f_in = File(name="/tmp/whole_directory/")
>>> outlets = []
>>> for file in FILE_CATEGORIES:
>>>   f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>>   outlets.append(f_out)
>>> run_this = BashOperator(
>>>   task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>>   inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>>   outlets={"datasets": outlets}
>>>   )
>>> run_this.set_downstream(run_this_last)
>>> 
>>> So I am trying to keep to boilerplate work down for developers. Operators
>>> can also extend inlets and outlets automatically. This will probably be a
>>> bit harder for the BashOperator without some special magic, but an update
>>> to the DruidOperator can be relatively quite straightforward.
>>> 
>>> In the future Operators can take advantage of the inlet/outlet definitions
>>> as they are also made available as part of the context for templating (as
>>> “inlets” and “outlets”).
>>> 
>>> I’m looking forward to your comments!
>>> 
>>> https://github.com/apache/incubator-airflow/pull/3321
>>> 
>>> Bolke.
>> 
>> 
>> 
>> [1] https://aws.amazon.com/glue/
>> 
>> Cheers,
>> 
>> -- 
>> Gerardo Curiel // https://gerar.do

Re: Lineage

Posted by Gerardo Curiel <ge...@gerar.do>.
On Sun, May 6, 2018 at 7:05 PM, Bolke de Bruin <bd...@gmail.com> wrote:

>
> Apache Atlas is agnostic and can receive lineage info by rest API (used in
> my implementation) and Kafk topic. It does also come with a lot of
> connectors out of the box that tie into the hadoop ecosystem and make your
> live easier there. The Airflow Atlas connector supplies Atlas with
> information that it doesn't know about yet closing the loop further.
>
>
Thanks for the explanation. Good to hear it has an API. I though the
"bridges" were the main point of integration.


Cheers,

-- 
Gerardo Curiel // https://gerar.do

Re: Lineage

Posted by Bolke de Bruin <bd...@gmail.com>.
Hi Gerardo,

Any lineage tracking system is dependent on how much data you can give it. So if you do transfers outside of the 'view' such a system has then lineage information is gone. Airflow can help in this area by tracking its internal lineage and providing that to those lineage systems. 

Apache Atlas is agnostic and can receive lineage info by rest API (used in my implementation) and Kafk topic. It does also come with a lot of connectors out of the box that tie into the hadoop ecosystem and make your live easier there. The Airflow Atlas connector supplies Atlas with information that it doesn't know about yet closing the loop further. 

Also you can write your own connector and put it on the Airflow class path and use that one. 

Bolke

Sent from my iPhone

> On 6 May 2018, at 09:13, Gerardo Curiel <ge...@gerar.do> wrote:
> 
> Hi Bolke,
> 
> Data lineage support sounds very interesting.
> 
> I'm not very familiar with Atlas but first sight seems like a tool specific
> to the Hadoop ecosystem. How would this look like if the files (inlets or
> outlets) were stored on s3?.
> 
> An example of a service that manages a similar use case is AWS Glue[1],
> which creates a hive metastore based on the schema and other metadata it
> can get from different sources (amongst them, s3 files).
> 
> 
>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bd...@gmail.com> wrote:
>> 
>> Hi All,
>> 
>> I have made a first implementation that allows tracking of lineage in
>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>> simpler.
>> 
>> Operators now have two new parameters called “inlets” and “outlets”. These
>> can be filled with objects derived from “DataSet”, like “File” and
>> “HadoopFile”. Parameters are jinja2 templated, which
>> means they receive the context of the task when it is running and get
>> rendered. So you can get definitions like this:
>> 
>> f_final = File(name="/tmp/final")
>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>    inlets={"auto": True},
>>    outlets={"datasets": [f_final,]})
>> 
>> f_in = File(name="/tmp/whole_directory/")
>> outlets = []
>> for file in FILE_CATEGORIES:
>>    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>    outlets.append(f_out)
>> run_this = BashOperator(
>>    task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>    outlets={"datasets": outlets}
>>    )
>> run_this.set_downstream(run_this_last)
>> 
>> So I am trying to keep to boilerplate work down for developers. Operators
>> can also extend inlets and outlets automatically. This will probably be a
>> bit harder for the BashOperator without some special magic, but an update
>> to the DruidOperator can be relatively quite straightforward.
>> 
>> In the future Operators can take advantage of the inlet/outlet definitions
>> as they are also made available as part of the context for templating (as
>> “inlets” and “outlets”).
>> 
>> I’m looking forward to your comments!
>> 
>> https://github.com/apache/incubator-airflow/pull/3321
>> 
>> Bolke.
> 
> 
> 
> [1] https://aws.amazon.com/glue/
> 
> Cheers,
> 
> -- 
> Gerardo Curiel // https://gerar.do

Re: Lineage

Posted by Gerardo Curiel <ge...@gerar.do>.
Hi Bolke,

Data lineage support sounds very interesting.

I'm not very familiar with Atlas but first sight seems like a tool specific
to the Hadoop ecosystem. How would this look like if the files (inlets or
outlets) were stored on s3?.

An example of a service that manages a similar use case is AWS Glue[1],
which creates a hive metastore based on the schema and other metadata it
can get from different sources (amongst them, s3 files).


On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> Hi All,
>
> I have made a first implementation that allows tracking of lineage in
> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
> work in the past on Data Flow pipelines, but I think I kept it a little bit
> simpler.
>
> Operators now have two new parameters called “inlets” and “outlets”. These
> can be filled with objects derived from “DataSet”, like “File” and
> “HadoopFile”. Parameters are jinja2 templated, which
> means they receive the context of the task when it is running and get
> rendered. So you can get definitions like this:
>
> f_final = File(name="/tmp/final")
> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>     inlets={"auto": True},
>     outlets={"datasets": [f_final,]})
>
> f_in = File(name="/tmp/whole_directory/")
> outlets = []
> for file in FILE_CATEGORIES:
>     f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>     outlets.append(f_out)
> run_this = BashOperator(
>     task_id='run_after_loop', bash_command='echo 1', dag=dag,
>     inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>     outlets={"datasets": outlets}
>     )
> run_this.set_downstream(run_this_last)
>
> So I am trying to keep to boilerplate work down for developers. Operators
> can also extend inlets and outlets automatically. This will probably be a
> bit harder for the BashOperator without some special magic, but an update
> to the DruidOperator can be relatively quite straightforward.
>
> In the future Operators can take advantage of the inlet/outlet definitions
> as they are also made available as part of the context for templating (as
> “inlets” and “outlets”).
>
> I’m looking forward to your comments!
>
> https://github.com/apache/incubator-airflow/pull/3321
>
> Bolke.



[1] https://aws.amazon.com/glue/

Cheers,

-- 
Gerardo Curiel // https://gerar.do