You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2020/02/05 11:47:59 UTC

[Discussion] Job generation / submission hooks & Atlas integration

Hi all!

We have started some preliminary work on the Flink - Atlas integration at
Cloudera. It seems that the integration will require some new hook
interfaces at the jobgraph generation and submission phases, so I figured I
will open a discussion thread with my initial ideas to get some early
feedback.

*Minimal background*
Very simply put Apache Atlas is a data governance framework that stores
metadata for our data and processing logic to track ownership, lineage etc.
It is already integrated with systems like HDFS, Kafka, Hive and many
others.

Adding Flink integration would mean that we can track the input output data
of our Flink jobs, their owners and how different Flink jobs are connected
to each other through the data they produce (lineage). This seems to be a
very big deal for a lot of companies :)

*Flink - Atlas integration in a nutshell*
In order to integrate with Atlas we basically need 2 things.
 - Flink entity definitions
 - Flink Atlas hook

The entity definition is the easy part. It is a json that contains the
objects (entities) that we want to store for any give Flink job. As a
starter we could have a single FlinkApplication entity that has a set of
inputs and outputs. These inputs/outputs are other Atlas entities that are
already defines such as Kafka topic or Hbase table.

The Flink atlas hook will be the logic that creates the entity instance and
uploads it to Atlas when we start a new Flink job. This is the part where
we implement the core logic.

*Job submission hook*
In order to implement the Atlas hook we need a place where we can inspect
the pipeline, create and send the metadata when the job starts. When we
create the FlinkApplication entity we need to be able to easily determine
the sources and sinks (and their properties) of the pipeline.

Unfortunately there is no JobSubmission hook in Flink that could execute
this logic and even if there was one there is a mismatch of abstraction
levels needed to implement the integration.
We could imagine a JobSubmission hook executed in the JobManager runner as
this:

void onSuccessfulSubmission(JobGraph jobGraph, Configuration configuration);

This is nice but the JobGraph makes it super difficult to extract sources
and UDFs to create the metadata entity. The atlas entity however could be
easily created from the StreamGraph object (used to represent the logical
flow) before the JobGraph is generated. To go around this limitation we
could add a JobGraphGeneratorHook interface:

void preProcess(StreamGraph streamGraph); void postProcess(JobGraph jobGraph);

We could then generate the atlas entity in the preprocess step and add a
jobmission hook in the postprocess step that will simply send the already
baked in entity.

*This kinda works but...*
The approach outlined above seems to work and we have built a POC using it.
Unfortunately it is far from nice as it exposes non-public APIs such as the
StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.

It would be much nicer if we could somehow go back from JobGraph to
StreamGraph or at least have an easy way to access source/sink UDFS.

What do you think?

Cheers,
Gyula

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Till Rohrmann <tr...@apache.org>.
Hi Gyula,

technically speaking the JobGraph is sent to the Dispatcher where a
JobMaster is started to execute the JobGraph. The JobGraph comes either
from the JobSubmitHandler or the JarRunHandler. Except for creating the
ExecutionGraph from the JobGraph there is not much happening on the
Dispatcher.

If Atlas only requires to work on the JobGraph, then I believe it would be
good enough if Flink offers the utilities to analyze it. Then we would not
need to add anything to Flink itself.

For the second question, I guess it mostly depends on the requirements from
Atlas. I guess one could make it a bit easier to extract information from
the JobGraph. Maybe Aljoscha can chime in on this topic.

Cheers,
Till

On Wed, Feb 5, 2020 at 7:35 PM Gyula Fóra <gy...@gmail.com> wrote:

> @Till Rohrmann <tr...@apache.org>
> You are completely right that the Atlas hook itself should not live inside
> Flink. All other hooks for the other projects are implemented as part of
> Atlas,
> and the Atlas community is ready to maintain it once we have a working
> version. The discussion is more about changes that we need in Flink (if
> any) to make it possible to implement the Atlas hook outside Flink.
>
> So in theory I agree that the hook should receive job submissions and just
> extract the metadata required by Atlas.
>
> There are 2 questions here (and my initial email gives one possible
> solution):
>
> 1. What is the component that receives the submission and runs the
> extraction logic? If you want to remove this process from Flink you could
> put something in front of the job submission rest endpoint but I think that
> would be an overkill. So I assumed that we will have to add some way of
> executing code on job submissions, hence my proposal of a job submission
> hook.
>
> 2. What information do we need to extract the atlas metadata? On job
> submissions we usually have JobGraph instances which are pretty hard to
> handle compared to a StreamGraph for instance when it comes to getting
> source/sink udfs. I am wondering if we need to make this easier somehow.
>
> Gyula
>
> On Wed, Feb 5, 2020 at 6:03 PM Taher Koitawala <ta...@gmail.com> wrote:
>
> > As far as I know, Atlas entries can be created with a rest call. Can we
> not
> > create an abstracted Flink operator that makes the rest call on job
> > execution/submission?
> >
> > Regards,
> > Taher Koitawala
> >
> > On Wed, Feb 5, 2020, 10:16 PM Flavio Pompermaier <po...@okkam.it>
> > wrote:
> >
> > > Hi Gyula,
> > > thanks for taking care of integrating Flink with Atlas (and Egeria
> > > initiative in the end) that is IMHO the most important part of all the
> > > Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
> > > confirm that the integration with Atlas/Egeria is absolutely of big
> > > interest.
> > >
> > > Il Mer 5 Feb 2020, 17:12 Till Rohrmann <tr...@apache.org> ha
> > scritto:
> > >
> > > > Hi Gyula,
> > > >
> > > > thanks for starting this discussion. Before diving in the details of
> > how
> > > to
> > > > implement this feature, I wanted to ask whether it is strictly
> required
> > > > that the Atlas integration lives within Flink or not? Could it also
> > work
> > > if
> > > > you have tool which receives job submissions, extracts the required
> > > > information, forwards the job submission to Flink, monitors the
> > execution
> > > > result and finally publishes some information to Atlas (modulo some
> > other
> > > > steps which are missing in my description)? Having a different layer
> > > being
> > > > responsible for this would keep complexity out of Flink.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gy...@apache.org>
> wrote:
> > > >
> > > > > Hi all!
> > > > >
> > > > > We have started some preliminary work on the Flink - Atlas
> > integration
> > > at
> > > > > Cloudera. It seems that the integration will require some new hook
> > > > > interfaces at the jobgraph generation and submission phases, so I
> > > > figured I
> > > > > will open a discussion thread with my initial ideas to get some
> early
> > > > > feedback.
> > > > >
> > > > > *Minimal background*
> > > > > Very simply put Apache Atlas is a data governance framework that
> > stores
> > > > > metadata for our data and processing logic to track ownership,
> > lineage
> > > > etc.
> > > > > It is already integrated with systems like HDFS, Kafka, Hive and
> many
> > > > > others.
> > > > >
> > > > > Adding Flink integration would mean that we can track the input
> > output
> > > > data
> > > > > of our Flink jobs, their owners and how different Flink jobs are
> > > > connected
> > > > > to each other through the data they produce (lineage). This seems
> to
> > > be a
> > > > > very big deal for a lot of companies :)
> > > > >
> > > > > *Flink - Atlas integration in a nutshell*
> > > > > In order to integrate with Atlas we basically need 2 things.
> > > > >  - Flink entity definitions
> > > > >  - Flink Atlas hook
> > > > >
> > > > > The entity definition is the easy part. It is a json that contains
> > the
> > > > > objects (entities) that we want to store for any give Flink job.
> As a
> > > > > starter we could have a single FlinkApplication entity that has a
> set
> > > of
> > > > > inputs and outputs. These inputs/outputs are other Atlas entities
> > that
> > > > are
> > > > > already defines such as Kafka topic or Hbase table.
> > > > >
> > > > > The Flink atlas hook will be the logic that creates the entity
> > instance
> > > > and
> > > > > uploads it to Atlas when we start a new Flink job. This is the part
> > > where
> > > > > we implement the core logic.
> > > > >
> > > > > *Job submission hook*
> > > > > In order to implement the Atlas hook we need a place where we can
> > > inspect
> > > > > the pipeline, create and send the metadata when the job starts.
> When
> > we
> > > > > create the FlinkApplication entity we need to be able to easily
> > > determine
> > > > > the sources and sinks (and their properties) of the pipeline.
> > > > >
> > > > > Unfortunately there is no JobSubmission hook in Flink that could
> > > execute
> > > > > this logic and even if there was one there is a mismatch of
> > abstraction
> > > > > levels needed to implement the integration.
> > > > > We could imagine a JobSubmission hook executed in the JobManager
> > runner
> > > > as
> > > > > this:
> > > > >
> > > > > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > > > > configuration);
> > > > >
> > > > > This is nice but the JobGraph makes it super difficult to extract
> > > sources
> > > > > and UDFs to create the metadata entity. The atlas entity however
> > could
> > > be
> > > > > easily created from the StreamGraph object (used to represent the
> > > logical
> > > > > flow) before the JobGraph is generated. To go around this
> limitation
> > we
> > > > > could add a JobGraphGeneratorHook interface:
> > > > >
> > > > > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > > > > jobGraph);
> > > > >
> > > > > We could then generate the atlas entity in the preprocess step and
> > add
> > > a
> > > > > jobmission hook in the postprocess step that will simply send the
> > > already
> > > > > baked in entity.
> > > > >
> > > > > *This kinda works but...*
> > > > > The approach outlined above seems to work and we have built a POC
> > using
> > > > it.
> > > > > Unfortunately it is far from nice as it exposes non-public APIs
> such
> > as
> > > > the
> > > > > StreamGraph. Also it feels a bit weird to have 2 hooks instead of
> > one.
> > > > >
> > > > > It would be much nicer if we could somehow go back from JobGraph to
> > > > > StreamGraph or at least have an easy way to access source/sink
> UDFS.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > >
> > >
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
@Till Rohrmann <tr...@apache.org>
You are completely right that the Atlas hook itself should not live inside
Flink. All other hooks for the other projects are implemented as part of
Atlas,
and the Atlas community is ready to maintain it once we have a working
version. The discussion is more about changes that we need in Flink (if
any) to make it possible to implement the Atlas hook outside Flink.

So in theory I agree that the hook should receive job submissions and just
extract the metadata required by Atlas.

There are 2 questions here (and my initial email gives one possible
solution):

1. What is the component that receives the submission and runs the
extraction logic? If you want to remove this process from Flink you could
put something in front of the job submission rest endpoint but I think that
would be an overkill. So I assumed that we will have to add some way of
executing code on job submissions, hence my proposal of a job submission
hook.

2. What information do we need to extract the atlas metadata? On job
submissions we usually have JobGraph instances which are pretty hard to
handle compared to a StreamGraph for instance when it comes to getting
source/sink udfs. I am wondering if we need to make this easier somehow.

Gyula

On Wed, Feb 5, 2020 at 6:03 PM Taher Koitawala <ta...@gmail.com> wrote:

> As far as I know, Atlas entries can be created with a rest call. Can we not
> create an abstracted Flink operator that makes the rest call on job
> execution/submission?
>
> Regards,
> Taher Koitawala
>
> On Wed, Feb 5, 2020, 10:16 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Hi Gyula,
> > thanks for taking care of integrating Flink with Atlas (and Egeria
> > initiative in the end) that is IMHO the most important part of all the
> > Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
> > confirm that the integration with Atlas/Egeria is absolutely of big
> > interest.
> >
> > Il Mer 5 Feb 2020, 17:12 Till Rohrmann <tr...@apache.org> ha
> scritto:
> >
> > > Hi Gyula,
> > >
> > > thanks for starting this discussion. Before diving in the details of
> how
> > to
> > > implement this feature, I wanted to ask whether it is strictly required
> > > that the Atlas integration lives within Flink or not? Could it also
> work
> > if
> > > you have tool which receives job submissions, extracts the required
> > > information, forwards the job submission to Flink, monitors the
> execution
> > > result and finally publishes some information to Atlas (modulo some
> other
> > > steps which are missing in my description)? Having a different layer
> > being
> > > responsible for this would keep complexity out of Flink.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gy...@apache.org> wrote:
> > >
> > > > Hi all!
> > > >
> > > > We have started some preliminary work on the Flink - Atlas
> integration
> > at
> > > > Cloudera. It seems that the integration will require some new hook
> > > > interfaces at the jobgraph generation and submission phases, so I
> > > figured I
> > > > will open a discussion thread with my initial ideas to get some early
> > > > feedback.
> > > >
> > > > *Minimal background*
> > > > Very simply put Apache Atlas is a data governance framework that
> stores
> > > > metadata for our data and processing logic to track ownership,
> lineage
> > > etc.
> > > > It is already integrated with systems like HDFS, Kafka, Hive and many
> > > > others.
> > > >
> > > > Adding Flink integration would mean that we can track the input
> output
> > > data
> > > > of our Flink jobs, their owners and how different Flink jobs are
> > > connected
> > > > to each other through the data they produce (lineage). This seems to
> > be a
> > > > very big deal for a lot of companies :)
> > > >
> > > > *Flink - Atlas integration in a nutshell*
> > > > In order to integrate with Atlas we basically need 2 things.
> > > >  - Flink entity definitions
> > > >  - Flink Atlas hook
> > > >
> > > > The entity definition is the easy part. It is a json that contains
> the
> > > > objects (entities) that we want to store for any give Flink job. As a
> > > > starter we could have a single FlinkApplication entity that has a set
> > of
> > > > inputs and outputs. These inputs/outputs are other Atlas entities
> that
> > > are
> > > > already defines such as Kafka topic or Hbase table.
> > > >
> > > > The Flink atlas hook will be the logic that creates the entity
> instance
> > > and
> > > > uploads it to Atlas when we start a new Flink job. This is the part
> > where
> > > > we implement the core logic.
> > > >
> > > > *Job submission hook*
> > > > In order to implement the Atlas hook we need a place where we can
> > inspect
> > > > the pipeline, create and send the metadata when the job starts. When
> we
> > > > create the FlinkApplication entity we need to be able to easily
> > determine
> > > > the sources and sinks (and their properties) of the pipeline.
> > > >
> > > > Unfortunately there is no JobSubmission hook in Flink that could
> > execute
> > > > this logic and even if there was one there is a mismatch of
> abstraction
> > > > levels needed to implement the integration.
> > > > We could imagine a JobSubmission hook executed in the JobManager
> runner
> > > as
> > > > this:
> > > >
> > > > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > > > configuration);
> > > >
> > > > This is nice but the JobGraph makes it super difficult to extract
> > sources
> > > > and UDFs to create the metadata entity. The atlas entity however
> could
> > be
> > > > easily created from the StreamGraph object (used to represent the
> > logical
> > > > flow) before the JobGraph is generated. To go around this limitation
> we
> > > > could add a JobGraphGeneratorHook interface:
> > > >
> > > > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > > > jobGraph);
> > > >
> > > > We could then generate the atlas entity in the preprocess step and
> add
> > a
> > > > jobmission hook in the postprocess step that will simply send the
> > already
> > > > baked in entity.
> > > >
> > > > *This kinda works but...*
> > > > The approach outlined above seems to work and we have built a POC
> using
> > > it.
> > > > Unfortunately it is far from nice as it exposes non-public APIs such
> as
> > > the
> > > > StreamGraph. Also it feels a bit weird to have 2 hooks instead of
> one.
> > > >
> > > > It would be much nicer if we could somehow go back from JobGraph to
> > > > StreamGraph or at least have an easy way to access source/sink UDFS.
> > > >
> > > > What do you think?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > >
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Taher Koitawala <ta...@gmail.com>.
As far as I know, Atlas entries can be created with a rest call. Can we not
create an abstracted Flink operator that makes the rest call on job
execution/submission?

Regards,
Taher Koitawala

On Wed, Feb 5, 2020, 10:16 PM Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi Gyula,
> thanks for taking care of integrating Flink with Atlas (and Egeria
> initiative in the end) that is IMHO the most important part of all the
> Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
> confirm that the integration with Atlas/Egeria is absolutely of big
> interest.
>
> Il Mer 5 Feb 2020, 17:12 Till Rohrmann <tr...@apache.org> ha scritto:
>
> > Hi Gyula,
> >
> > thanks for starting this discussion. Before diving in the details of how
> to
> > implement this feature, I wanted to ask whether it is strictly required
> > that the Atlas integration lives within Flink or not? Could it also work
> if
> > you have tool which receives job submissions, extracts the required
> > information, forwards the job submission to Flink, monitors the execution
> > result and finally publishes some information to Atlas (modulo some other
> > steps which are missing in my description)? Having a different layer
> being
> > responsible for this would keep complexity out of Flink.
> >
> > Cheers,
> > Till
> >
> > On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gy...@apache.org> wrote:
> >
> > > Hi all!
> > >
> > > We have started some preliminary work on the Flink - Atlas integration
> at
> > > Cloudera. It seems that the integration will require some new hook
> > > interfaces at the jobgraph generation and submission phases, so I
> > figured I
> > > will open a discussion thread with my initial ideas to get some early
> > > feedback.
> > >
> > > *Minimal background*
> > > Very simply put Apache Atlas is a data governance framework that stores
> > > metadata for our data and processing logic to track ownership, lineage
> > etc.
> > > It is already integrated with systems like HDFS, Kafka, Hive and many
> > > others.
> > >
> > > Adding Flink integration would mean that we can track the input output
> > data
> > > of our Flink jobs, their owners and how different Flink jobs are
> > connected
> > > to each other through the data they produce (lineage). This seems to
> be a
> > > very big deal for a lot of companies :)
> > >
> > > *Flink - Atlas integration in a nutshell*
> > > In order to integrate with Atlas we basically need 2 things.
> > >  - Flink entity definitions
> > >  - Flink Atlas hook
> > >
> > > The entity definition is the easy part. It is a json that contains the
> > > objects (entities) that we want to store for any give Flink job. As a
> > > starter we could have a single FlinkApplication entity that has a set
> of
> > > inputs and outputs. These inputs/outputs are other Atlas entities that
> > are
> > > already defines such as Kafka topic or Hbase table.
> > >
> > > The Flink atlas hook will be the logic that creates the entity instance
> > and
> > > uploads it to Atlas when we start a new Flink job. This is the part
> where
> > > we implement the core logic.
> > >
> > > *Job submission hook*
> > > In order to implement the Atlas hook we need a place where we can
> inspect
> > > the pipeline, create and send the metadata when the job starts. When we
> > > create the FlinkApplication entity we need to be able to easily
> determine
> > > the sources and sinks (and their properties) of the pipeline.
> > >
> > > Unfortunately there is no JobSubmission hook in Flink that could
> execute
> > > this logic and even if there was one there is a mismatch of abstraction
> > > levels needed to implement the integration.
> > > We could imagine a JobSubmission hook executed in the JobManager runner
> > as
> > > this:
> > >
> > > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > > configuration);
> > >
> > > This is nice but the JobGraph makes it super difficult to extract
> sources
> > > and UDFs to create the metadata entity. The atlas entity however could
> be
> > > easily created from the StreamGraph object (used to represent the
> logical
> > > flow) before the JobGraph is generated. To go around this limitation we
> > > could add a JobGraphGeneratorHook interface:
> > >
> > > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > > jobGraph);
> > >
> > > We could then generate the atlas entity in the preprocess step and add
> a
> > > jobmission hook in the postprocess step that will simply send the
> already
> > > baked in entity.
> > >
> > > *This kinda works but...*
> > > The approach outlined above seems to work and we have built a POC using
> > it.
> > > Unfortunately it is far from nice as it exposes non-public APIs such as
> > the
> > > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> > >
> > > It would be much nicer if we could somehow go back from JobGraph to
> > > StreamGraph or at least have an easy way to access source/sink UDFS.
> > >
> > > What do you think?
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Gyula,
thanks for taking care of integrating Flink with Atlas (and Egeria
initiative in the end) that is IMHO the most important part of all the
Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
confirm that the integration with Atlas/Egeria is absolutely of big
interest.

Il Mer 5 Feb 2020, 17:12 Till Rohrmann <tr...@apache.org> ha scritto:

> Hi Gyula,
>
> thanks for starting this discussion. Before diving in the details of how to
> implement this feature, I wanted to ask whether it is strictly required
> that the Atlas integration lives within Flink or not? Could it also work if
> you have tool which receives job submissions, extracts the required
> information, forwards the job submission to Flink, monitors the execution
> result and finally publishes some information to Atlas (modulo some other
> steps which are missing in my description)? Having a different layer being
> responsible for this would keep complexity out of Flink.
>
> Cheers,
> Till
>
> On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gy...@apache.org> wrote:
>
> > Hi all!
> >
> > We have started some preliminary work on the Flink - Atlas integration at
> > Cloudera. It seems that the integration will require some new hook
> > interfaces at the jobgraph generation and submission phases, so I
> figured I
> > will open a discussion thread with my initial ideas to get some early
> > feedback.
> >
> > *Minimal background*
> > Very simply put Apache Atlas is a data governance framework that stores
> > metadata for our data and processing logic to track ownership, lineage
> etc.
> > It is already integrated with systems like HDFS, Kafka, Hive and many
> > others.
> >
> > Adding Flink integration would mean that we can track the input output
> data
> > of our Flink jobs, their owners and how different Flink jobs are
> connected
> > to each other through the data they produce (lineage). This seems to be a
> > very big deal for a lot of companies :)
> >
> > *Flink - Atlas integration in a nutshell*
> > In order to integrate with Atlas we basically need 2 things.
> >  - Flink entity definitions
> >  - Flink Atlas hook
> >
> > The entity definition is the easy part. It is a json that contains the
> > objects (entities) that we want to store for any give Flink job. As a
> > starter we could have a single FlinkApplication entity that has a set of
> > inputs and outputs. These inputs/outputs are other Atlas entities that
> are
> > already defines such as Kafka topic or Hbase table.
> >
> > The Flink atlas hook will be the logic that creates the entity instance
> and
> > uploads it to Atlas when we start a new Flink job. This is the part where
> > we implement the core logic.
> >
> > *Job submission hook*
> > In order to implement the Atlas hook we need a place where we can inspect
> > the pipeline, create and send the metadata when the job starts. When we
> > create the FlinkApplication entity we need to be able to easily determine
> > the sources and sinks (and their properties) of the pipeline.
> >
> > Unfortunately there is no JobSubmission hook in Flink that could execute
> > this logic and even if there was one there is a mismatch of abstraction
> > levels needed to implement the integration.
> > We could imagine a JobSubmission hook executed in the JobManager runner
> as
> > this:
> >
> > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > configuration);
> >
> > This is nice but the JobGraph makes it super difficult to extract sources
> > and UDFs to create the metadata entity. The atlas entity however could be
> > easily created from the StreamGraph object (used to represent the logical
> > flow) before the JobGraph is generated. To go around this limitation we
> > could add a JobGraphGeneratorHook interface:
> >
> > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > jobGraph);
> >
> > We could then generate the atlas entity in the preprocess step and add a
> > jobmission hook in the postprocess step that will simply send the already
> > baked in entity.
> >
> > *This kinda works but...*
> > The approach outlined above seems to work and we have built a POC using
> it.
> > Unfortunately it is far from nice as it exposes non-public APIs such as
> the
> > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> >
> > It would be much nicer if we could somehow go back from JobGraph to
> > StreamGraph or at least have an easy way to access source/sink UDFS.
> >
> > What do you think?
> >
> > Cheers,
> > Gyula
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Till Rohrmann <tr...@apache.org>.
Hi Gyula,

thanks for starting this discussion. Before diving in the details of how to
implement this feature, I wanted to ask whether it is strictly required
that the Atlas integration lives within Flink or not? Could it also work if
you have tool which receives job submissions, extracts the required
information, forwards the job submission to Flink, monitors the execution
result and finally publishes some information to Atlas (modulo some other
steps which are missing in my description)? Having a different layer being
responsible for this would keep complexity out of Flink.

Cheers,
Till

On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra <gy...@apache.org> wrote:

> Hi all!
>
> We have started some preliminary work on the Flink - Atlas integration at
> Cloudera. It seems that the integration will require some new hook
> interfaces at the jobgraph generation and submission phases, so I figured I
> will open a discussion thread with my initial ideas to get some early
> feedback.
>
> *Minimal background*
> Very simply put Apache Atlas is a data governance framework that stores
> metadata for our data and processing logic to track ownership, lineage etc.
> It is already integrated with systems like HDFS, Kafka, Hive and many
> others.
>
> Adding Flink integration would mean that we can track the input output data
> of our Flink jobs, their owners and how different Flink jobs are connected
> to each other through the data they produce (lineage). This seems to be a
> very big deal for a lot of companies :)
>
> *Flink - Atlas integration in a nutshell*
> In order to integrate with Atlas we basically need 2 things.
>  - Flink entity definitions
>  - Flink Atlas hook
>
> The entity definition is the easy part. It is a json that contains the
> objects (entities) that we want to store for any give Flink job. As a
> starter we could have a single FlinkApplication entity that has a set of
> inputs and outputs. These inputs/outputs are other Atlas entities that are
> already defines such as Kafka topic or Hbase table.
>
> The Flink atlas hook will be the logic that creates the entity instance and
> uploads it to Atlas when we start a new Flink job. This is the part where
> we implement the core logic.
>
> *Job submission hook*
> In order to implement the Atlas hook we need a place where we can inspect
> the pipeline, create and send the metadata when the job starts. When we
> create the FlinkApplication entity we need to be able to easily determine
> the sources and sinks (and their properties) of the pipeline.
>
> Unfortunately there is no JobSubmission hook in Flink that could execute
> this logic and even if there was one there is a mismatch of abstraction
> levels needed to implement the integration.
> We could imagine a JobSubmission hook executed in the JobManager runner as
> this:
>
> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> configuration);
>
> This is nice but the JobGraph makes it super difficult to extract sources
> and UDFs to create the metadata entity. The atlas entity however could be
> easily created from the StreamGraph object (used to represent the logical
> flow) before the JobGraph is generated. To go around this limitation we
> could add a JobGraphGeneratorHook interface:
>
> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> jobGraph);
>
> We could then generate the atlas entity in the preprocess step and add a
> jobmission hook in the postprocess step that will simply send the already
> baked in entity.
>
> *This kinda works but...*
> The approach outlined above seems to work and we have built a POC using it.
> Unfortunately it is far from nice as it exposes non-public APIs such as the
> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>
> It would be much nicer if we could somehow go back from JobGraph to
> StreamGraph or at least have an easy way to access source/sink UDFS.
>
> What do you think?
>
> Cheers,
> Gyula
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Kostas Kloudas <kk...@gmail.com>.
I think that the ExecutorListener idea could work.

With a bit more than FLIP-85, it is true that we can get rid of the
"exception throwing" environments and we need to introduce an
"EmbeddedExecutor" which is going to run on the JM. So, the 2 above,
coupled with an ExecutorListener can have the desired effect.

Cheers,
Kostas

On Fri, Mar 13, 2020 at 11:37 AM Stephan Ewen <se...@apache.org> wrote:
>
> Few thoughts on the discussion:
>
> ## Changes on the Master
>
> If possible, let's avoid changes to the master (JobManager / Dispatcher).
> These components are complex, we should strive to keep anything out of them
> that we can keep out of them.
>
> ## Problems in different deployments (applications / sessions)
>
> This should be pretty straightforward after FLIP-84, correct? There should
> be no more "exception throwing" environments that sneak the job graph out
> of the main method.
>
> ## Proposal: Executor Listeners
>
> We could think of a mix between the two approaches: Executor Listerners.
> When an executor is invoked with the Pipeline, the listener is also
> notified. That would keep this out of the API and be properly within the
> SPI layer.
> The listeners could be loaded from config, or via service loaders.
>
>
> On Fri, Mar 13, 2020 at 8:59 AM tison <wa...@gmail.com> wrote:
>
> > Hi Gyula and all,
> >
> > Thanks for the discussion so far.
> >
> > It seems that the requirement is to deliver some metadata of the submitted
> > job,
> > and such metadata can be simply extracted from StreamGraph.
> >
> > I'm unfamiliar with metadata Atlas needs so I make some assumptions.
> >
> > Assumption:
> > Metadata needed by Atlas is actually some Flink scope information, such as
> > input/
> > output of a node. And this metadata is compile time information so that we
> > know it
> > during compiling the StreamGraph.
> >
> > If the assumption stands, I'm curious whether or not it is an option we
> > standardize
> > the JSON representation of StreamGraph which will contain metadata
> > required. And
> > pass the JSON representation from generation phase to JobGraph and then
> > ExecutionGraph and finally retrievable from RestServer(so that we can
> > extend
> > JobClient to retrieve the plan by querying the cluster instead of have a
> > pre-configured
> > one).
> >
> > It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan
> > REST
> > endpoint). And I believe rather than JobGraph dump which is a physical
> > plan,
> > exposing access to StreamGraph dump which is a logical plan is possibly
> > more
> > interested from user perspective.
> >
> > Best,
> > tison.
> >
> >
> > Gyula Fóra <gy...@gmail.com> 于2020年3月13日周五 下午3:20写道:
> >
> > > Thanks again Kostas for diving deep into this, it is great feedback!
> > >
> > > I agree with the concerns regarding the custom executor, it has to be
> > able
> > > to properly handle the "original" executor somehow.
> > > This might be quite tricky if we want to implement the AtlasExecutor
> > > outside Flink. In any case does not really feel clean or lightweight at
> > > first glance.
> > >
> > > As for the JobClient/JobListener/Pipeline question, as you brought up the
> > > possibility for later attaching the JobClient, maybe the best route for
> > > this would be to
> > > add the Pipeline as a method parameter in the JobListener. It would break
> > > code compatibility but at least would have a consistent behavior.
> > >
> > > Now to the big problem of not having executors / joblisteners work in
> > > kuberentes-per-job,  web, etc modes. I was not aware of this problem
> > until
> > > now, this also seems to affect the whole concept of the JobListener
> > > interface. What good is a JobListener if it only listens to certain kind
> > of
> > > deployments :)
> > >
> > > Incidentally, in my first proposal (and prototype) I had the atlashook
> > > running on the JobMaster with an extra addition to a JobGraphGenerator
> > hook
> > > that could be registered in the StreamExecutionEnvironment. This meant
> > that
> > > we could work on the StreamGraph, register metadata in the JobGraph, and
> > > execute the actual atlas registration logic in the JobMaster when the job
> > > starts.
> > >
> > > Looking back this is a much more complex, and uglier, logic than having a
> > > simple JobListener. But it would at least work in all possible job
> > > submission scenarios, as long as the JobGraph was generated through the
> > > StreamGraph logic (which should be always).
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kk...@gmail.com>
> > wrote:
> > >
> > > > Thanks Gyula,
> > > >
> > > > Looking forward to your comments.
> > > > Just to let you know, I would not like having a method that in some
> > > > cases works as expected and in some other ones it does not. It would
> > > > be nice if we could expose consistent behaviour to the users.
> > > >
> > > > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gy...@gmail.com>
> > wrote:
> > > > >
> > > > > Thanks Kostas, I have to review the possible limitations with the
> > > > Executor
> > > > > before I can properly answer.
> > > > >
> > > > > Regarding you comments for the listener pattern, we proposed in the
> > > > > document to include the getPipeline() in the JobClient itself as you
> > > > > suggested to fit the pattern :) For not always being able to return
> > the
> > > > > pipeline, this might be expected depending on how the JobClient, so
> > we
> > > > need
> > > > > to handle it some way.
> > > > >
> > > > >
> > > > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi again,
> > > > > >
> > > > > > Just to clarify, I am not against exposing the Pipeline if this
> > will
> > > > > > lead to a "clean" solution.
> > > > > > And, I. forgot to say that the last solution, if adopted, would
> > have
> > > > > > to work on the JobGraph, which may not be that desirable.
> > > > > >
> > > > > > Kostas
> > > > > >
> > > > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kkloudas@gmail.com
> > >
> > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I do not have a strong opinion on the topic yet, but I would like
> > > to
> > > > > > > share my thoughts on this.
> > > > > > >
> > > > > > > In the solution proposing a wrapping AtlasExecutor around the
> > Flink
> > > > > > > Executors, if we allow the user to use the CLI to submit jobs,
> > then
> > > > > > > this means that the CLI code may have to change so that it
> > injects
> > > > the
> > > > > > > executor option to AtlasExecutor (transparently to the user), and
> > > > then
> > > > > > > the AtlasExecutor should take what the user has actually set as
> > > > > > > pipeline executor and find the adequate executor. If this is not
> > > done
> > > > > > > transparently, then the user should do sth explicit to point
> > Flink
> > > to
> > > > > > > Atlas and then to the correct executor, which implies that we
> > > should
> > > > > > > add user-facing stuff (like cli options) to Flink.
> > > > > > >
> > > > > > > For the solution of adding getPipeline() to the JobListener, I
> > > think
> > > > > > > that from a design perspective, it does not fit in the listener
> > > > > > > itself. The listener is a "passive" entity that is expected to
> > > listen
> > > > > > > to specific "events". Adding a getter does not fit there. Other
> > > > > > > options for the getPipeline() method are:
> > > > > > > 1) add it as a method to the JobClient
> > > > > > > 2) add it as an argument to the methods of the JobListener (along
> > > > with
> > > > > > > the JobClient and the throwable)
> > > > > > >
> > > > > > > Alternative 1) would currently work because the JobClient is only
> > > > > > > instantiated by the executor. But in the future, we may (and
> > > probably
> > > > > > > will because of implications of FLIP-85) allow a JobClient to get
> > > > > > > "attached" to a running job. In this case, the getPipeline() will
> > > not
> > > > > > > have a pipeline to return.
> > > > > > > Alternative 2) will break existing code, which I am not sure how
> > > > > > > important this is as the JobListener is a new feature and I guess
> > > > some
> > > > > > > but not many users.
> > > > > > >
> > > > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > > > > > above solutions would work in per-job mode for Kuberneter, Mesos
> > or
> > > > > > > with web-submissions. These modes go through "special" execution
> > > > > > > environments that use them simply to extract the JobGraph which
> > > then
> > > > > > > they submit to the cluster. In this case, there is no executor
> > > > > > > involved. Are these cases important to you?
> > > > > > >
> > > > > > > Finally, another solution, although more drastic and more
> > involved,
> > > > > > > could be to have a "JobListener" running on the jobMaster. This
> > > will
> > > > > > > collect the relevant info and send them to Atlas. But I am not
> > sure
> > > > > > > how Atlas works and if it requires the data to be extracted on
> > the
> > > > > > > client side. I am saying this because the JobMasters may be
> > running
> > > > > > > anywhere in a cluster while the clients may run on designated
> > > > machines
> > > > > > > which can have specific configurations, e.g. open ports to
> > > > communicate
> > > > > > > with a specific Atlas server.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Kostas
> > > > > > >
> > > > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Gyula!
> > > > > > > >
> > > > > > > > My main motivation was to try and avoid mixing an internal
> > > > interface
> > > > > > > > (Pipeline) with public API. It looks like this is trying to go
> > > > "public
> > > > > > > > stable", but doesn't really do it exactly because of mixing
> > > > "pipeline"
> > > > > > into
> > > > > > > > this.
> > > > > > > > You would need to cast "Pipeline" and work on internal classes
> > in
> > > > the
> > > > > > > > implementation.
> > > > > > > >
> > > > > > > > If we use an "internal API" or a "semi-stable SPI" class, it
> > > looks
> > > > at a
> > > > > > > > first glance a bit cleaner and more maintainable (opening up
> > less
> > > > > > surface)
> > > > > > > > to make the PipelineExecutor a "stable SPI".
> > > > > > > > I have not checked out all the details, though.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Stephan
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <
> > gyula.fora@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Stephan!
> > > > > > > > >
> > > > > > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a
> > > good
> > > > > > > > > alternative approach to implement this but I actually feel
> > that
> > > > it
> > > > > > suffers
> > > > > > > > > even more problems than exposing the Pipeline instance in the
> > > > > > JobListener.
> > > > > > > > >
> > > > > > > > > The main idea with the Atlas integration would be to have the
> > > > Atlas
> > > > > > hook
> > > > > > > > > logic in the Atlas project where it would be maintained. This
> > > > means
> > > > > > that
> > > > > > > > > any approach we take has to rely on public APIs. The
> > > JobListener
> > > > is
> > > > > > already
> > > > > > > > > a public evolving API while the PipelineExecutor and the
> > > factory
> > > > is
> > > > > > purely
> > > > > > > > > internal. Even if we make it public it will still expose the
> > > > > > Pipeline so we
> > > > > > > > > did not gain much on the public/internal API front.
> > > > > > > > >
> > > > > > > > > I also feel that since the Atlas hook logic should only
> > observe
> > > > the
> > > > > > > > > pipeline and collect information the JobListener interface
> > > seems
> > > > an
> > > > > > ideal
> > > > > > > > > match and the implementation can be pretty lightweight. From
> > a
> > > > purely
> > > > > > > > > implementation perspective adding an Executor would be more
> > > > heavy as
> > > > > > it has
> > > > > > > > > to properly delegate to an other executor making sure that we
> > > > don't
> > > > > > break
> > > > > > > > > anything.
> > > > > > > > >
> > > > > > > > > Don't take me wrong, I am not opposed to reworking the
> > > > > > implementations we
> > > > > > > > > have as it's very simple at this point but I also want to
> > make
> > > > sure
> > > > > > that we
> > > > > > > > > take the approach that is simple from a maintainability
> > > > standpoint.
> > > > > > Of
> > > > > > > > > course my argument rests on the assumption that the AtlasHook
> > > > itself
> > > > > > will
> > > > > > > > > live outside of the Flink project, thats another question.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <
> > > sewen@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > In general, nice idea to support this integration with
> > Atlas.
> > > > > > > > > >
> > > > > > > > > > I think we could make this a bit easier/lightweight with a
> > > > small
> > > > > > change.
> > > > > > > > > > One of the issues that is not super nice is that this
> > starts
> > > > > > exposing the
> > > > > > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > > > > > The Pipeline is an SPI interface that would be good to hide
> > > in
> > > > the
> > > > > > API.
> > > > > > > > > >
> > > > > > > > > > Since 1.10, Flink has the notion of Executors, which take
> > the
> > > > > > pipeline
> > > > > > > > > and
> > > > > > > > > > execute them. Meaning each pipeline is passed on anyways.
> > And
> > > > > > executors
> > > > > > > > > are
> > > > > > > > > > already configurable in the Flink configuration.
> > > > > > > > > > So, instead of passing the pipeline both "down" (to the
> > > > executor)
> > > > > > and "to
> > > > > > > > > > the side" (JobListener), could we just have a wrapping
> > > > > > "AtlasExecutor"
> > > > > > > > > that
> > > > > > > > > > takes the pipeline, does whatever it wants, and then passes
> > > it
> > > > to
> > > > > > the
> > > > > > > > > > proper executor? This would also have the advantage that it
> > > > > > supports
> > > > > > > > > making
> > > > > > > > > > changes to the pipeline, if needed in the future. For
> > > example,
> > > > if
> > > > > > there
> > > > > > > > > is
> > > > > > > > > > ever the need to add additional configuration fields, set
> > > > > > properties, add
> > > > > > > > > > "labels" or so, this could be easily done in the suggested
> > > > > > approach.
> > > > > > > > > >
> > > > > > > > > > I tried to sketch this in the picture below, pardon my bad
> > > > drawing.
> > > > > > > > > >
> > > > > > > > > > [image: Listener_Executor.png]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > >
> > >
> > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Stephan
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > > > > > aljoscha@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Thanks! I'm reading the document now and will get back to
> > > you.
> > > > > > > > > >>
> > > > > > > > > >> Best,
> > > > > > > > > >> Aljoscha
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > >
> > >
> >

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Stephan Ewen <se...@apache.org>.
Few thoughts on the discussion:

## Changes on the Master

If possible, let's avoid changes to the master (JobManager / Dispatcher).
These components are complex, we should strive to keep anything out of them
that we can keep out of them.

## Problems in different deployments (applications / sessions)

This should be pretty straightforward after FLIP-84, correct? There should
be no more "exception throwing" environments that sneak the job graph out
of the main method.

## Proposal: Executor Listeners

We could think of a mix between the two approaches: Executor Listerners.
When an executor is invoked with the Pipeline, the listener is also
notified. That would keep this out of the API and be properly within the
SPI layer.
The listeners could be loaded from config, or via service loaders.


On Fri, Mar 13, 2020 at 8:59 AM tison <wa...@gmail.com> wrote:

> Hi Gyula and all,
>
> Thanks for the discussion so far.
>
> It seems that the requirement is to deliver some metadata of the submitted
> job,
> and such metadata can be simply extracted from StreamGraph.
>
> I'm unfamiliar with metadata Atlas needs so I make some assumptions.
>
> Assumption:
> Metadata needed by Atlas is actually some Flink scope information, such as
> input/
> output of a node. And this metadata is compile time information so that we
> know it
> during compiling the StreamGraph.
>
> If the assumption stands, I'm curious whether or not it is an option we
> standardize
> the JSON representation of StreamGraph which will contain metadata
> required. And
> pass the JSON representation from generation phase to JobGraph and then
> ExecutionGraph and finally retrievable from RestServer(so that we can
> extend
> JobClient to retrieve the plan by querying the cluster instead of have a
> pre-configured
> one).
>
> It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan
> REST
> endpoint). And I believe rather than JobGraph dump which is a physical
> plan,
> exposing access to StreamGraph dump which is a logical plan is possibly
> more
> interested from user perspective.
>
> Best,
> tison.
>
>
> Gyula Fóra <gy...@gmail.com> 于2020年3月13日周五 下午3:20写道:
>
> > Thanks again Kostas for diving deep into this, it is great feedback!
> >
> > I agree with the concerns regarding the custom executor, it has to be
> able
> > to properly handle the "original" executor somehow.
> > This might be quite tricky if we want to implement the AtlasExecutor
> > outside Flink. In any case does not really feel clean or lightweight at
> > first glance.
> >
> > As for the JobClient/JobListener/Pipeline question, as you brought up the
> > possibility for later attaching the JobClient, maybe the best route for
> > this would be to
> > add the Pipeline as a method parameter in the JobListener. It would break
> > code compatibility but at least would have a consistent behavior.
> >
> > Now to the big problem of not having executors / joblisteners work in
> > kuberentes-per-job,  web, etc modes. I was not aware of this problem
> until
> > now, this also seems to affect the whole concept of the JobListener
> > interface. What good is a JobListener if it only listens to certain kind
> of
> > deployments :)
> >
> > Incidentally, in my first proposal (and prototype) I had the atlashook
> > running on the JobMaster with an extra addition to a JobGraphGenerator
> hook
> > that could be registered in the StreamExecutionEnvironment. This meant
> that
> > we could work on the StreamGraph, register metadata in the JobGraph, and
> > execute the actual atlas registration logic in the JobMaster when the job
> > starts.
> >
> > Looking back this is a much more complex, and uglier, logic than having a
> > simple JobListener. But it would at least work in all possible job
> > submission scenarios, as long as the JobGraph was generated through the
> > StreamGraph logic (which should be always).
> >
> > Cheers,
> > Gyula
> >
> >
> > On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kk...@gmail.com>
> wrote:
> >
> > > Thanks Gyula,
> > >
> > > Looking forward to your comments.
> > > Just to let you know, I would not like having a method that in some
> > > cases works as expected and in some other ones it does not. It would
> > > be nice if we could expose consistent behaviour to the users.
> > >
> > > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gy...@gmail.com>
> wrote:
> > > >
> > > > Thanks Kostas, I have to review the possible limitations with the
> > > Executor
> > > > before I can properly answer.
> > > >
> > > > Regarding you comments for the listener pattern, we proposed in the
> > > > document to include the getPipeline() in the JobClient itself as you
> > > > suggested to fit the pattern :) For not always being able to return
> the
> > > > pipeline, this might be expected depending on how the JobClient, so
> we
> > > need
> > > > to handle it some way.
> > > >
> > > >
> > > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi again,
> > > > >
> > > > > Just to clarify, I am not against exposing the Pipeline if this
> will
> > > > > lead to a "clean" solution.
> > > > > And, I. forgot to say that the last solution, if adopted, would
> have
> > > > > to work on the JobGraph, which may not be that desirable.
> > > > >
> > > > > Kostas
> > > > >
> > > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kkloudas@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I do not have a strong opinion on the topic yet, but I would like
> > to
> > > > > > share my thoughts on this.
> > > > > >
> > > > > > In the solution proposing a wrapping AtlasExecutor around the
> Flink
> > > > > > Executors, if we allow the user to use the CLI to submit jobs,
> then
> > > > > > this means that the CLI code may have to change so that it
> injects
> > > the
> > > > > > executor option to AtlasExecutor (transparently to the user), and
> > > then
> > > > > > the AtlasExecutor should take what the user has actually set as
> > > > > > pipeline executor and find the adequate executor. If this is not
> > done
> > > > > > transparently, then the user should do sth explicit to point
> Flink
> > to
> > > > > > Atlas and then to the correct executor, which implies that we
> > should
> > > > > > add user-facing stuff (like cli options) to Flink.
> > > > > >
> > > > > > For the solution of adding getPipeline() to the JobListener, I
> > think
> > > > > > that from a design perspective, it does not fit in the listener
> > > > > > itself. The listener is a "passive" entity that is expected to
> > listen
> > > > > > to specific "events". Adding a getter does not fit there. Other
> > > > > > options for the getPipeline() method are:
> > > > > > 1) add it as a method to the JobClient
> > > > > > 2) add it as an argument to the methods of the JobListener (along
> > > with
> > > > > > the JobClient and the throwable)
> > > > > >
> > > > > > Alternative 1) would currently work because the JobClient is only
> > > > > > instantiated by the executor. But in the future, we may (and
> > probably
> > > > > > will because of implications of FLIP-85) allow a JobClient to get
> > > > > > "attached" to a running job. In this case, the getPipeline() will
> > not
> > > > > > have a pipeline to return.
> > > > > > Alternative 2) will break existing code, which I am not sure how
> > > > > > important this is as the JobListener is a new feature and I guess
> > > some
> > > > > > but not many users.
> > > > > >
> > > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > > > > above solutions would work in per-job mode for Kuberneter, Mesos
> or
> > > > > > with web-submissions. These modes go through "special" execution
> > > > > > environments that use them simply to extract the JobGraph which
> > then
> > > > > > they submit to the cluster. In this case, there is no executor
> > > > > > involved. Are these cases important to you?
> > > > > >
> > > > > > Finally, another solution, although more drastic and more
> involved,
> > > > > > could be to have a "JobListener" running on the jobMaster. This
> > will
> > > > > > collect the relevant info and send them to Atlas. But I am not
> sure
> > > > > > how Atlas works and if it requires the data to be extracted on
> the
> > > > > > client side. I am saying this because the JobMasters may be
> running
> > > > > > anywhere in a cluster while the clients may run on designated
> > > machines
> > > > > > which can have specific configurations, e.g. open ports to
> > > communicate
> > > > > > with a specific Atlas server.
> > > > > >
> > > > > > Cheers,
> > > > > > Kostas
> > > > > >
> > > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > > > >
> > > > > > > Hi Gyula!
> > > > > > >
> > > > > > > My main motivation was to try and avoid mixing an internal
> > > interface
> > > > > > > (Pipeline) with public API. It looks like this is trying to go
> > > "public
> > > > > > > stable", but doesn't really do it exactly because of mixing
> > > "pipeline"
> > > > > into
> > > > > > > this.
> > > > > > > You would need to cast "Pipeline" and work on internal classes
> in
> > > the
> > > > > > > implementation.
> > > > > > >
> > > > > > > If we use an "internal API" or a "semi-stable SPI" class, it
> > looks
> > > at a
> > > > > > > first glance a bit cleaner and more maintainable (opening up
> less
> > > > > surface)
> > > > > > > to make the PipelineExecutor a "stable SPI".
> > > > > > > I have not checked out all the details, though.
> > > > > > >
> > > > > > > Best,
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <
> gyula.fora@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Stephan!
> > > > > > > >
> > > > > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a
> > good
> > > > > > > > alternative approach to implement this but I actually feel
> that
> > > it
> > > > > suffers
> > > > > > > > even more problems than exposing the Pipeline instance in the
> > > > > JobListener.
> > > > > > > >
> > > > > > > > The main idea with the Atlas integration would be to have the
> > > Atlas
> > > > > hook
> > > > > > > > logic in the Atlas project where it would be maintained. This
> > > means
> > > > > that
> > > > > > > > any approach we take has to rely on public APIs. The
> > JobListener
> > > is
> > > > > already
> > > > > > > > a public evolving API while the PipelineExecutor and the
> > factory
> > > is
> > > > > purely
> > > > > > > > internal. Even if we make it public it will still expose the
> > > > > Pipeline so we
> > > > > > > > did not gain much on the public/internal API front.
> > > > > > > >
> > > > > > > > I also feel that since the Atlas hook logic should only
> observe
> > > the
> > > > > > > > pipeline and collect information the JobListener interface
> > seems
> > > an
> > > > > ideal
> > > > > > > > match and the implementation can be pretty lightweight. From
> a
> > > purely
> > > > > > > > implementation perspective adding an Executor would be more
> > > heavy as
> > > > > it has
> > > > > > > > to properly delegate to an other executor making sure that we
> > > don't
> > > > > break
> > > > > > > > anything.
> > > > > > > >
> > > > > > > > Don't take me wrong, I am not opposed to reworking the
> > > > > implementations we
> > > > > > > > have as it's very simple at this point but I also want to
> make
> > > sure
> > > > > that we
> > > > > > > > take the approach that is simple from a maintainability
> > > standpoint.
> > > > > Of
> > > > > > > > course my argument rests on the assumption that the AtlasHook
> > > itself
> > > > > will
> > > > > > > > live outside of the Flink project, thats another question.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Gyula
> > > > > > > >
> > > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <
> > sewen@apache.org>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all!
> > > > > > > > >
> > > > > > > > > In general, nice idea to support this integration with
> Atlas.
> > > > > > > > >
> > > > > > > > > I think we could make this a bit easier/lightweight with a
> > > small
> > > > > change.
> > > > > > > > > One of the issues that is not super nice is that this
> starts
> > > > > exposing the
> > > > > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > > > > The Pipeline is an SPI interface that would be good to hide
> > in
> > > the
> > > > > API.
> > > > > > > > >
> > > > > > > > > Since 1.10, Flink has the notion of Executors, which take
> the
> > > > > pipeline
> > > > > > > > and
> > > > > > > > > execute them. Meaning each pipeline is passed on anyways.
> And
> > > > > executors
> > > > > > > > are
> > > > > > > > > already configurable in the Flink configuration.
> > > > > > > > > So, instead of passing the pipeline both "down" (to the
> > > executor)
> > > > > and "to
> > > > > > > > > the side" (JobListener), could we just have a wrapping
> > > > > "AtlasExecutor"
> > > > > > > > that
> > > > > > > > > takes the pipeline, does whatever it wants, and then passes
> > it
> > > to
> > > > > the
> > > > > > > > > proper executor? This would also have the advantage that it
> > > > > supports
> > > > > > > > making
> > > > > > > > > changes to the pipeline, if needed in the future. For
> > example,
> > > if
> > > > > there
> > > > > > > > is
> > > > > > > > > ever the need to add additional configuration fields, set
> > > > > properties, add
> > > > > > > > > "labels" or so, this could be easily done in the suggested
> > > > > approach.
> > > > > > > > >
> > > > > > > > > I tried to sketch this in the picture below, pardon my bad
> > > drawing.
> > > > > > > > >
> > > > > > > > > [image: Listener_Executor.png]
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > >
> > >
> >
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Stephan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > > > > aljoscha@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Thanks! I'm reading the document now and will get back to
> > you.
> > > > > > > > >>
> > > > > > > > >> Best,
> > > > > > > > >> Aljoscha
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > >
> > >
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by tison <wa...@gmail.com>.
Hi Gyula and all,

Thanks for the discussion so far.

It seems that the requirement is to deliver some metadata of the submitted
job,
and such metadata can be simply extracted from StreamGraph.

I'm unfamiliar with metadata Atlas needs so I make some assumptions.

Assumption:
Metadata needed by Atlas is actually some Flink scope information, such as
input/
output of a node. And this metadata is compile time information so that we
know it
during compiling the StreamGraph.

If the assumption stands, I'm curious whether or not it is an option we
standardize
the JSON representation of StreamGraph which will contain metadata
required. And
pass the JSON representation from generation phase to JobGraph and then
ExecutionGraph and finally retrievable from RestServer(so that we can extend
JobClient to retrieve the plan by querying the cluster instead of have a
pre-configured
one).

It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan REST
endpoint). And I believe rather than JobGraph dump which is a physical plan,
exposing access to StreamGraph dump which is a logical plan is possibly more
interested from user perspective.

Best,
tison.


Gyula Fóra <gy...@gmail.com> 于2020年3月13日周五 下午3:20写道:

> Thanks again Kostas for diving deep into this, it is great feedback!
>
> I agree with the concerns regarding the custom executor, it has to be able
> to properly handle the "original" executor somehow.
> This might be quite tricky if we want to implement the AtlasExecutor
> outside Flink. In any case does not really feel clean or lightweight at
> first glance.
>
> As for the JobClient/JobListener/Pipeline question, as you brought up the
> possibility for later attaching the JobClient, maybe the best route for
> this would be to
> add the Pipeline as a method parameter in the JobListener. It would break
> code compatibility but at least would have a consistent behavior.
>
> Now to the big problem of not having executors / joblisteners work in
> kuberentes-per-job,  web, etc modes. I was not aware of this problem until
> now, this also seems to affect the whole concept of the JobListener
> interface. What good is a JobListener if it only listens to certain kind of
> deployments :)
>
> Incidentally, in my first proposal (and prototype) I had the atlashook
> running on the JobMaster with an extra addition to a JobGraphGenerator hook
> that could be registered in the StreamExecutionEnvironment. This meant that
> we could work on the StreamGraph, register metadata in the JobGraph, and
> execute the actual atlas registration logic in the JobMaster when the job
> starts.
>
> Looking back this is a much more complex, and uglier, logic than having a
> simple JobListener. But it would at least work in all possible job
> submission scenarios, as long as the JobGraph was generated through the
> StreamGraph logic (which should be always).
>
> Cheers,
> Gyula
>
>
> On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kk...@gmail.com> wrote:
>
> > Thanks Gyula,
> >
> > Looking forward to your comments.
> > Just to let you know, I would not like having a method that in some
> > cases works as expected and in some other ones it does not. It would
> > be nice if we could expose consistent behaviour to the users.
> >
> > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gy...@gmail.com> wrote:
> > >
> > > Thanks Kostas, I have to review the possible limitations with the
> > Executor
> > > before I can properly answer.
> > >
> > > Regarding you comments for the listener pattern, we proposed in the
> > > document to include the getPipeline() in the JobClient itself as you
> > > suggested to fit the pattern :) For not always being able to return the
> > > pipeline, this might be expected depending on how the JobClient, so we
> > need
> > > to handle it some way.
> > >
> > >
> > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com>
> > wrote:
> > >
> > > > Hi again,
> > > >
> > > > Just to clarify, I am not against exposing the Pipeline if this will
> > > > lead to a "clean" solution.
> > > > And, I. forgot to say that the last solution, if adopted, would have
> > > > to work on the JobGraph, which may not be that desirable.
> > > >
> > > > Kostas
> > > >
> > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kk...@gmail.com>
> > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I do not have a strong opinion on the topic yet, but I would like
> to
> > > > > share my thoughts on this.
> > > > >
> > > > > In the solution proposing a wrapping AtlasExecutor around the Flink
> > > > > Executors, if we allow the user to use the CLI to submit jobs, then
> > > > > this means that the CLI code may have to change so that it injects
> > the
> > > > > executor option to AtlasExecutor (transparently to the user), and
> > then
> > > > > the AtlasExecutor should take what the user has actually set as
> > > > > pipeline executor and find the adequate executor. If this is not
> done
> > > > > transparently, then the user should do sth explicit to point Flink
> to
> > > > > Atlas and then to the correct executor, which implies that we
> should
> > > > > add user-facing stuff (like cli options) to Flink.
> > > > >
> > > > > For the solution of adding getPipeline() to the JobListener, I
> think
> > > > > that from a design perspective, it does not fit in the listener
> > > > > itself. The listener is a "passive" entity that is expected to
> listen
> > > > > to specific "events". Adding a getter does not fit there. Other
> > > > > options for the getPipeline() method are:
> > > > > 1) add it as a method to the JobClient
> > > > > 2) add it as an argument to the methods of the JobListener (along
> > with
> > > > > the JobClient and the throwable)
> > > > >
> > > > > Alternative 1) would currently work because the JobClient is only
> > > > > instantiated by the executor. But in the future, we may (and
> probably
> > > > > will because of implications of FLIP-85) allow a JobClient to get
> > > > > "attached" to a running job. In this case, the getPipeline() will
> not
> > > > > have a pipeline to return.
> > > > > Alternative 2) will break existing code, which I am not sure how
> > > > > important this is as the JobListener is a new feature and I guess
> > some
> > > > > but not many users.
> > > > >
> > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > > > above solutions would work in per-job mode for Kuberneter, Mesos or
> > > > > with web-submissions. These modes go through "special" execution
> > > > > environments that use them simply to extract the JobGraph which
> then
> > > > > they submit to the cluster. In this case, there is no executor
> > > > > involved. Are these cases important to you?
> > > > >
> > > > > Finally, another solution, although more drastic and more involved,
> > > > > could be to have a "JobListener" running on the jobMaster. This
> will
> > > > > collect the relevant info and send them to Atlas. But I am not sure
> > > > > how Atlas works and if it requires the data to be extracted on the
> > > > > client side. I am saying this because the JobMasters may be running
> > > > > anywhere in a cluster while the clients may run on designated
> > machines
> > > > > which can have specific configurations, e.g. open ports to
> > communicate
> > > > > with a specific Atlas server.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org>
> > wrote:
> > > > > >
> > > > > > Hi Gyula!
> > > > > >
> > > > > > My main motivation was to try and avoid mixing an internal
> > interface
> > > > > > (Pipeline) with public API. It looks like this is trying to go
> > "public
> > > > > > stable", but doesn't really do it exactly because of mixing
> > "pipeline"
> > > > into
> > > > > > this.
> > > > > > You would need to cast "Pipeline" and work on internal classes in
> > the
> > > > > > implementation.
> > > > > >
> > > > > > If we use an "internal API" or a "semi-stable SPI" class, it
> looks
> > at a
> > > > > > first glance a bit cleaner and more maintainable (opening up less
> > > > surface)
> > > > > > to make the PipelineExecutor a "stable SPI".
> > > > > > I have not checked out all the details, though.
> > > > > >
> > > > > > Best,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gyula.fora@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Stephan!
> > > > > > >
> > > > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a
> good
> > > > > > > alternative approach to implement this but I actually feel that
> > it
> > > > suffers
> > > > > > > even more problems than exposing the Pipeline instance in the
> > > > JobListener.
> > > > > > >
> > > > > > > The main idea with the Atlas integration would be to have the
> > Atlas
> > > > hook
> > > > > > > logic in the Atlas project where it would be maintained. This
> > means
> > > > that
> > > > > > > any approach we take has to rely on public APIs. The
> JobListener
> > is
> > > > already
> > > > > > > a public evolving API while the PipelineExecutor and the
> factory
> > is
> > > > purely
> > > > > > > internal. Even if we make it public it will still expose the
> > > > Pipeline so we
> > > > > > > did not gain much on the public/internal API front.
> > > > > > >
> > > > > > > I also feel that since the Atlas hook logic should only observe
> > the
> > > > > > > pipeline and collect information the JobListener interface
> seems
> > an
> > > > ideal
> > > > > > > match and the implementation can be pretty lightweight. From a
> > purely
> > > > > > > implementation perspective adding an Executor would be more
> > heavy as
> > > > it has
> > > > > > > to properly delegate to an other executor making sure that we
> > don't
> > > > break
> > > > > > > anything.
> > > > > > >
> > > > > > > Don't take me wrong, I am not opposed to reworking the
> > > > implementations we
> > > > > > > have as it's very simple at this point but I also want to make
> > sure
> > > > that we
> > > > > > > take the approach that is simple from a maintainability
> > standpoint.
> > > > Of
> > > > > > > course my argument rests on the assumption that the AtlasHook
> > itself
> > > > will
> > > > > > > live outside of the Flink project, thats another question.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Gyula
> > > > > > >
> > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <
> sewen@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi all!
> > > > > > > >
> > > > > > > > In general, nice idea to support this integration with Atlas.
> > > > > > > >
> > > > > > > > I think we could make this a bit easier/lightweight with a
> > small
> > > > change.
> > > > > > > > One of the issues that is not super nice is that this starts
> > > > exposing the
> > > > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > > > The Pipeline is an SPI interface that would be good to hide
> in
> > the
> > > > API.
> > > > > > > >
> > > > > > > > Since 1.10, Flink has the notion of Executors, which take the
> > > > pipeline
> > > > > > > and
> > > > > > > > execute them. Meaning each pipeline is passed on anyways. And
> > > > executors
> > > > > > > are
> > > > > > > > already configurable in the Flink configuration.
> > > > > > > > So, instead of passing the pipeline both "down" (to the
> > executor)
> > > > and "to
> > > > > > > > the side" (JobListener), could we just have a wrapping
> > > > "AtlasExecutor"
> > > > > > > that
> > > > > > > > takes the pipeline, does whatever it wants, and then passes
> it
> > to
> > > > the
> > > > > > > > proper executor? This would also have the advantage that it
> > > > supports
> > > > > > > making
> > > > > > > > changes to the pipeline, if needed in the future. For
> example,
> > if
> > > > there
> > > > > > > is
> > > > > > > > ever the need to add additional configuration fields, set
> > > > properties, add
> > > > > > > > "labels" or so, this could be easily done in the suggested
> > > > approach.
> > > > > > > >
> > > > > > > > I tried to sketch this in the picture below, pardon my bad
> > drawing.
> > > > > > > >
> > > > > > > > [image: Listener_Executor.png]
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > >
> >
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Stephan
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > > > aljoscha@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Thanks! I'm reading the document now and will get back to
> you.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Aljoscha
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > >
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks again Kostas for diving deep into this, it is great feedback!

I agree with the concerns regarding the custom executor, it has to be able
to properly handle the "original" executor somehow.
This might be quite tricky if we want to implement the AtlasExecutor
outside Flink. In any case does not really feel clean or lightweight at
first glance.

As for the JobClient/JobListener/Pipeline question, as you brought up the
possibility for later attaching the JobClient, maybe the best route for
this would be to
add the Pipeline as a method parameter in the JobListener. It would break
code compatibility but at least would have a consistent behavior.

Now to the big problem of not having executors / joblisteners work in
kuberentes-per-job,  web, etc modes. I was not aware of this problem until
now, this also seems to affect the whole concept of the JobListener
interface. What good is a JobListener if it only listens to certain kind of
deployments :)

Incidentally, in my first proposal (and prototype) I had the atlashook
running on the JobMaster with an extra addition to a JobGraphGenerator hook
that could be registered in the StreamExecutionEnvironment. This meant that
we could work on the StreamGraph, register metadata in the JobGraph, and
execute the actual atlas registration logic in the JobMaster when the job
starts.

Looking back this is a much more complex, and uglier, logic than having a
simple JobListener. But it would at least work in all possible job
submission scenarios, as long as the JobGraph was generated through the
StreamGraph logic (which should be always).

Cheers,
Gyula


On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kk...@gmail.com> wrote:

> Thanks Gyula,
>
> Looking forward to your comments.
> Just to let you know, I would not like having a method that in some
> cases works as expected and in some other ones it does not. It would
> be nice if we could expose consistent behaviour to the users.
>
> On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gy...@gmail.com> wrote:
> >
> > Thanks Kostas, I have to review the possible limitations with the
> Executor
> > before I can properly answer.
> >
> > Regarding you comments for the listener pattern, we proposed in the
> > document to include the getPipeline() in the JobClient itself as you
> > suggested to fit the pattern :) For not always being able to return the
> > pipeline, this might be expected depending on how the JobClient, so we
> need
> > to handle it some way.
> >
> >
> > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com>
> wrote:
> >
> > > Hi again,
> > >
> > > Just to clarify, I am not against exposing the Pipeline if this will
> > > lead to a "clean" solution.
> > > And, I. forgot to say that the last solution, if adopted, would have
> > > to work on the JobGraph, which may not be that desirable.
> > >
> > > Kostas
> > >
> > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kk...@gmail.com>
> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I do not have a strong opinion on the topic yet, but I would like to
> > > > share my thoughts on this.
> > > >
> > > > In the solution proposing a wrapping AtlasExecutor around the Flink
> > > > Executors, if we allow the user to use the CLI to submit jobs, then
> > > > this means that the CLI code may have to change so that it injects
> the
> > > > executor option to AtlasExecutor (transparently to the user), and
> then
> > > > the AtlasExecutor should take what the user has actually set as
> > > > pipeline executor and find the adequate executor. If this is not done
> > > > transparently, then the user should do sth explicit to point Flink to
> > > > Atlas and then to the correct executor, which implies that we should
> > > > add user-facing stuff (like cli options) to Flink.
> > > >
> > > > For the solution of adding getPipeline() to the JobListener, I think
> > > > that from a design perspective, it does not fit in the listener
> > > > itself. The listener is a "passive" entity that is expected to listen
> > > > to specific "events". Adding a getter does not fit there. Other
> > > > options for the getPipeline() method are:
> > > > 1) add it as a method to the JobClient
> > > > 2) add it as an argument to the methods of the JobListener (along
> with
> > > > the JobClient and the throwable)
> > > >
> > > > Alternative 1) would currently work because the JobClient is only
> > > > instantiated by the executor. But in the future, we may (and probably
> > > > will because of implications of FLIP-85) allow a JobClient to get
> > > > "attached" to a running job. In this case, the getPipeline() will not
> > > > have a pipeline to return.
> > > > Alternative 2) will break existing code, which I am not sure how
> > > > important this is as the JobListener is a new feature and I guess
> some
> > > > but not many users.
> > > >
> > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > > above solutions would work in per-job mode for Kuberneter, Mesos or
> > > > with web-submissions. These modes go through "special" execution
> > > > environments that use them simply to extract the JobGraph which then
> > > > they submit to the cluster. In this case, there is no executor
> > > > involved. Are these cases important to you?
> > > >
> > > > Finally, another solution, although more drastic and more involved,
> > > > could be to have a "JobListener" running on the jobMaster. This will
> > > > collect the relevant info and send them to Atlas. But I am not sure
> > > > how Atlas works and if it requires the data to be extracted on the
> > > > client side. I am saying this because the JobMasters may be running
> > > > anywhere in a cluster while the clients may run on designated
> machines
> > > > which can have specific configurations, e.g. open ports to
> communicate
> > > > with a specific Atlas server.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org>
> wrote:
> > > > >
> > > > > Hi Gyula!
> > > > >
> > > > > My main motivation was to try and avoid mixing an internal
> interface
> > > > > (Pipeline) with public API. It looks like this is trying to go
> "public
> > > > > stable", but doesn't really do it exactly because of mixing
> "pipeline"
> > > into
> > > > > this.
> > > > > You would need to cast "Pipeline" and work on internal classes in
> the
> > > > > implementation.
> > > > >
> > > > > If we use an "internal API" or a "semi-stable SPI" class, it looks
> at a
> > > > > first glance a bit cleaner and more maintainable (opening up less
> > > surface)
> > > > > to make the PipelineExecutor a "stable SPI".
> > > > > I have not checked out all the details, though.
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Stephan!
> > > > > >
> > > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > > PipelineExecutors with a delegating AtlasExecutor would be a good
> > > > > > alternative approach to implement this but I actually feel that
> it
> > > suffers
> > > > > > even more problems than exposing the Pipeline instance in the
> > > JobListener.
> > > > > >
> > > > > > The main idea with the Atlas integration would be to have the
> Atlas
> > > hook
> > > > > > logic in the Atlas project where it would be maintained. This
> means
> > > that
> > > > > > any approach we take has to rely on public APIs. The JobListener
> is
> > > already
> > > > > > a public evolving API while the PipelineExecutor and the factory
> is
> > > purely
> > > > > > internal. Even if we make it public it will still expose the
> > > Pipeline so we
> > > > > > did not gain much on the public/internal API front.
> > > > > >
> > > > > > I also feel that since the Atlas hook logic should only observe
> the
> > > > > > pipeline and collect information the JobListener interface seems
> an
> > > ideal
> > > > > > match and the implementation can be pretty lightweight. From a
> purely
> > > > > > implementation perspective adding an Executor would be more
> heavy as
> > > it has
> > > > > > to properly delegate to an other executor making sure that we
> don't
> > > break
> > > > > > anything.
> > > > > >
> > > > > > Don't take me wrong, I am not opposed to reworking the
> > > implementations we
> > > > > > have as it's very simple at this point but I also want to make
> sure
> > > that we
> > > > > > take the approach that is simple from a maintainability
> standpoint.
> > > Of
> > > > > > course my argument rests on the assumption that the AtlasHook
> itself
> > > will
> > > > > > live outside of the Flink project, thats another question.
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Hi all!
> > > > > > >
> > > > > > > In general, nice idea to support this integration with Atlas.
> > > > > > >
> > > > > > > I think we could make this a bit easier/lightweight with a
> small
> > > change.
> > > > > > > One of the issues that is not super nice is that this starts
> > > exposing the
> > > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > > The Pipeline is an SPI interface that would be good to hide in
> the
> > > API.
> > > > > > >
> > > > > > > Since 1.10, Flink has the notion of Executors, which take the
> > > pipeline
> > > > > > and
> > > > > > > execute them. Meaning each pipeline is passed on anyways. And
> > > executors
> > > > > > are
> > > > > > > already configurable in the Flink configuration.
> > > > > > > So, instead of passing the pipeline both "down" (to the
> executor)
> > > and "to
> > > > > > > the side" (JobListener), could we just have a wrapping
> > > "AtlasExecutor"
> > > > > > that
> > > > > > > takes the pipeline, does whatever it wants, and then passes it
> to
> > > the
> > > > > > > proper executor? This would also have the advantage that it
> > > supports
> > > > > > making
> > > > > > > changes to the pipeline, if needed in the future. For example,
> if
> > > there
> > > > > > is
> > > > > > > ever the need to add additional configuration fields, set
> > > properties, add
> > > > > > > "labels" or so, this could be easily done in the suggested
> > > approach.
> > > > > > >
> > > > > > > I tried to sketch this in the picture below, pardon my bad
> drawing.
> > > > > > >
> > > > > > > [image: Listener_Executor.png]
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > >
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > > aljoscha@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Thanks! I'm reading the document now and will get back to you.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Aljoscha
> > > > > > >>
> > > > > > >
> > > > > >
> > >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Kostas Kloudas <kk...@gmail.com>.
Thanks Gyula,

Looking forward to your comments.
Just to let you know, I would not like having a method that in some
cases works as expected and in some other ones it does not. It would
be nice if we could expose consistent behaviour to the users.

On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gy...@gmail.com> wrote:
>
> Thanks Kostas, I have to review the possible limitations with the Executor
> before I can properly answer.
>
> Regarding you comments for the listener pattern, we proposed in the
> document to include the getPipeline() in the JobClient itself as you
> suggested to fit the pattern :) For not always being able to return the
> pipeline, this might be expected depending on how the JobClient, so we need
> to handle it some way.
>
>
> On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com> wrote:
>
> > Hi again,
> >
> > Just to clarify, I am not against exposing the Pipeline if this will
> > lead to a "clean" solution.
> > And, I. forgot to say that the last solution, if adopted, would have
> > to work on the JobGraph, which may not be that desirable.
> >
> > Kostas
> >
> > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kk...@gmail.com> wrote:
> > >
> > > Hi all,
> > >
> > > I do not have a strong opinion on the topic yet, but I would like to
> > > share my thoughts on this.
> > >
> > > In the solution proposing a wrapping AtlasExecutor around the Flink
> > > Executors, if we allow the user to use the CLI to submit jobs, then
> > > this means that the CLI code may have to change so that it injects the
> > > executor option to AtlasExecutor (transparently to the user), and then
> > > the AtlasExecutor should take what the user has actually set as
> > > pipeline executor and find the adequate executor. If this is not done
> > > transparently, then the user should do sth explicit to point Flink to
> > > Atlas and then to the correct executor, which implies that we should
> > > add user-facing stuff (like cli options) to Flink.
> > >
> > > For the solution of adding getPipeline() to the JobListener, I think
> > > that from a design perspective, it does not fit in the listener
> > > itself. The listener is a "passive" entity that is expected to listen
> > > to specific "events". Adding a getter does not fit there. Other
> > > options for the getPipeline() method are:
> > > 1) add it as a method to the JobClient
> > > 2) add it as an argument to the methods of the JobListener (along with
> > > the JobClient and the throwable)
> > >
> > > Alternative 1) would currently work because the JobClient is only
> > > instantiated by the executor. But in the future, we may (and probably
> > > will because of implications of FLIP-85) allow a JobClient to get
> > > "attached" to a running job. In this case, the getPipeline() will not
> > > have a pipeline to return.
> > > Alternative 2) will break existing code, which I am not sure how
> > > important this is as the JobListener is a new feature and I guess some
> > > but not many users.
> > >
> > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > above solutions would work in per-job mode for Kuberneter, Mesos or
> > > with web-submissions. These modes go through "special" execution
> > > environments that use them simply to extract the JobGraph which then
> > > they submit to the cluster. In this case, there is no executor
> > > involved. Are these cases important to you?
> > >
> > > Finally, another solution, although more drastic and more involved,
> > > could be to have a "JobListener" running on the jobMaster. This will
> > > collect the relevant info and send them to Atlas. But I am not sure
> > > how Atlas works and if it requires the data to be extracted on the
> > > client side. I am saying this because the JobMasters may be running
> > > anywhere in a cluster while the clients may run on designated machines
> > > which can have specific configurations, e.g. open ports to communicate
> > > with a specific Atlas server.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> wrote:
> > > >
> > > > Hi Gyula!
> > > >
> > > > My main motivation was to try and avoid mixing an internal interface
> > > > (Pipeline) with public API. It looks like this is trying to go "public
> > > > stable", but doesn't really do it exactly because of mixing "pipeline"
> > into
> > > > this.
> > > > You would need to cast "Pipeline" and work on internal classes in the
> > > > implementation.
> > > >
> > > > If we use an "internal API" or a "semi-stable SPI" class, it looks at a
> > > > first glance a bit cleaner and more maintainable (opening up less
> > surface)
> > > > to make the PipelineExecutor a "stable SPI".
> > > > I have not checked out all the details, though.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Stephan!
> > > > >
> > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > PipelineExecutors with a delegating AtlasExecutor would be a good
> > > > > alternative approach to implement this but I actually feel that it
> > suffers
> > > > > even more problems than exposing the Pipeline instance in the
> > JobListener.
> > > > >
> > > > > The main idea with the Atlas integration would be to have the Atlas
> > hook
> > > > > logic in the Atlas project where it would be maintained. This means
> > that
> > > > > any approach we take has to rely on public APIs. The JobListener is
> > already
> > > > > a public evolving API while the PipelineExecutor and the factory is
> > purely
> > > > > internal. Even if we make it public it will still expose the
> > Pipeline so we
> > > > > did not gain much on the public/internal API front.
> > > > >
> > > > > I also feel that since the Atlas hook logic should only observe the
> > > > > pipeline and collect information the JobListener interface seems an
> > ideal
> > > > > match and the implementation can be pretty lightweight. From a purely
> > > > > implementation perspective adding an Executor would be more heavy as
> > it has
> > > > > to properly delegate to an other executor making sure that we don't
> > break
> > > > > anything.
> > > > >
> > > > > Don't take me wrong, I am not opposed to reworking the
> > implementations we
> > > > > have as it's very simple at this point but I also want to make sure
> > that we
> > > > > take the approach that is simple from a maintainability standpoint.
> > Of
> > > > > course my argument rests on the assumption that the AtlasHook itself
> > will
> > > > > live outside of the Flink project, thats another question.
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi all!
> > > > > >
> > > > > > In general, nice idea to support this integration with Atlas.
> > > > > >
> > > > > > I think we could make this a bit easier/lightweight with a small
> > change.
> > > > > > One of the issues that is not super nice is that this starts
> > exposing the
> > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > The Pipeline is an SPI interface that would be good to hide in the
> > API.
> > > > > >
> > > > > > Since 1.10, Flink has the notion of Executors, which take the
> > pipeline
> > > > > and
> > > > > > execute them. Meaning each pipeline is passed on anyways. And
> > executors
> > > > > are
> > > > > > already configurable in the Flink configuration.
> > > > > > So, instead of passing the pipeline both "down" (to the executor)
> > and "to
> > > > > > the side" (JobListener), could we just have a wrapping
> > "AtlasExecutor"
> > > > > that
> > > > > > takes the pipeline, does whatever it wants, and then passes it to
> > the
> > > > > > proper executor? This would also have the advantage that it
> > supports
> > > > > making
> > > > > > changes to the pipeline, if needed in the future. For example, if
> > there
> > > > > is
> > > > > > ever the need to add additional configuration fields, set
> > properties, add
> > > > > > "labels" or so, this could be easily done in the suggested
> > approach.
> > > > > >
> > > > > > I tried to sketch this in the picture below, pardon my bad drawing.
> > > > > >
> > > > > > [image: Listener_Executor.png]
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> Thanks! I'm reading the document now and will get back to you.
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >
> > > > >
> >

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks Kostas, I have to review the possible limitations with the Executor
before I can properly answer.

Regarding you comments for the listener pattern, we proposed in the
document to include the getPipeline() in the JobClient itself as you
suggested to fit the pattern :) For not always being able to return the
pipeline, this might be expected depending on how the JobClient, so we need
to handle it some way.


On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi again,
>
> Just to clarify, I am not against exposing the Pipeline if this will
> lead to a "clean" solution.
> And, I. forgot to say that the last solution, if adopted, would have
> to work on the JobGraph, which may not be that desirable.
>
> Kostas
>
> On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kk...@gmail.com> wrote:
> >
> > Hi all,
> >
> > I do not have a strong opinion on the topic yet, but I would like to
> > share my thoughts on this.
> >
> > In the solution proposing a wrapping AtlasExecutor around the Flink
> > Executors, if we allow the user to use the CLI to submit jobs, then
> > this means that the CLI code may have to change so that it injects the
> > executor option to AtlasExecutor (transparently to the user), and then
> > the AtlasExecutor should take what the user has actually set as
> > pipeline executor and find the adequate executor. If this is not done
> > transparently, then the user should do sth explicit to point Flink to
> > Atlas and then to the correct executor, which implies that we should
> > add user-facing stuff (like cli options) to Flink.
> >
> > For the solution of adding getPipeline() to the JobListener, I think
> > that from a design perspective, it does not fit in the listener
> > itself. The listener is a "passive" entity that is expected to listen
> > to specific "events". Adding a getter does not fit there. Other
> > options for the getPipeline() method are:
> > 1) add it as a method to the JobClient
> > 2) add it as an argument to the methods of the JobListener (along with
> > the JobClient and the throwable)
> >
> > Alternative 1) would currently work because the JobClient is only
> > instantiated by the executor. But in the future, we may (and probably
> > will because of implications of FLIP-85) allow a JobClient to get
> > "attached" to a running job. In this case, the getPipeline() will not
> > have a pipeline to return.
> > Alternative 2) will break existing code, which I am not sure how
> > important this is as the JobListener is a new feature and I guess some
> > but not many users.
> >
> > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > above solutions would work in per-job mode for Kuberneter, Mesos or
> > with web-submissions. These modes go through "special" execution
> > environments that use them simply to extract the JobGraph which then
> > they submit to the cluster. In this case, there is no executor
> > involved. Are these cases important to you?
> >
> > Finally, another solution, although more drastic and more involved,
> > could be to have a "JobListener" running on the jobMaster. This will
> > collect the relevant info and send them to Atlas. But I am not sure
> > how Atlas works and if it requires the data to be extracted on the
> > client side. I am saying this because the JobMasters may be running
> > anywhere in a cluster while the clients may run on designated machines
> > which can have specific configurations, e.g. open ports to communicate
> > with a specific Atlas server.
> >
> > Cheers,
> > Kostas
> >
> > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> wrote:
> > >
> > > Hi Gyula!
> > >
> > > My main motivation was to try and avoid mixing an internal interface
> > > (Pipeline) with public API. It looks like this is trying to go "public
> > > stable", but doesn't really do it exactly because of mixing "pipeline"
> into
> > > this.
> > > You would need to cast "Pipeline" and work on internal classes in the
> > > implementation.
> > >
> > > If we use an "internal API" or a "semi-stable SPI" class, it looks at a
> > > first glance a bit cleaner and more maintainable (opening up less
> surface)
> > > to make the PipelineExecutor a "stable SPI".
> > > I have not checked out all the details, though.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com>
> wrote:
> > >
> > > > Hi Stephan!
> > > >
> > > > Thanks for checking this out. I agree that wrapping the other
> > > > PipelineExecutors with a delegating AtlasExecutor would be a good
> > > > alternative approach to implement this but I actually feel that it
> suffers
> > > > even more problems than exposing the Pipeline instance in the
> JobListener.
> > > >
> > > > The main idea with the Atlas integration would be to have the Atlas
> hook
> > > > logic in the Atlas project where it would be maintained. This means
> that
> > > > any approach we take has to rely on public APIs. The JobListener is
> already
> > > > a public evolving API while the PipelineExecutor and the factory is
> purely
> > > > internal. Even if we make it public it will still expose the
> Pipeline so we
> > > > did not gain much on the public/internal API front.
> > > >
> > > > I also feel that since the Atlas hook logic should only observe the
> > > > pipeline and collect information the JobListener interface seems an
> ideal
> > > > match and the implementation can be pretty lightweight. From a purely
> > > > implementation perspective adding an Executor would be more heavy as
> it has
> > > > to properly delegate to an other executor making sure that we don't
> break
> > > > anything.
> > > >
> > > > Don't take me wrong, I am not opposed to reworking the
> implementations we
> > > > have as it's very simple at this point but I also want to make sure
> that we
> > > > take the approach that is simple from a maintainability standpoint.
> Of
> > > > course my argument rests on the assumption that the AtlasHook itself
> will
> > > > live outside of the Flink project, thats another question.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org>
> wrote:
> > > >
> > > > > Hi all!
> > > > >
> > > > > In general, nice idea to support this integration with Atlas.
> > > > >
> > > > > I think we could make this a bit easier/lightweight with a small
> change.
> > > > > One of the issues that is not super nice is that this starts
> exposing the
> > > > > (currently empty) Pipeline interface in the public API.
> > > > > The Pipeline is an SPI interface that would be good to hide in the
> API.
> > > > >
> > > > > Since 1.10, Flink has the notion of Executors, which take the
> pipeline
> > > > and
> > > > > execute them. Meaning each pipeline is passed on anyways. And
> executors
> > > > are
> > > > > already configurable in the Flink configuration.
> > > > > So, instead of passing the pipeline both "down" (to the executor)
> and "to
> > > > > the side" (JobListener), could we just have a wrapping
> "AtlasExecutor"
> > > > that
> > > > > takes the pipeline, does whatever it wants, and then passes it to
> the
> > > > > proper executor? This would also have the advantage that it
> supports
> > > > making
> > > > > changes to the pipeline, if needed in the future. For example, if
> there
> > > > is
> > > > > ever the need to add additional configuration fields, set
> properties, add
> > > > > "labels" or so, this could be easily done in the suggested
> approach.
> > > > >
> > > > > I tried to sketch this in the picture below, pardon my bad drawing.
> > > > >
> > > > > [image: Listener_Executor.png]
> > > > >
> > > > >
> > > > >
> > > >
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > >
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> aljoscha@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Thanks! I'm reading the document now and will get back to you.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >
> > > >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi again,

Just to clarify, I am not against exposing the Pipeline if this will
lead to a "clean" solution.
And, I. forgot to say that the last solution, if adopted, would have
to work on the JobGraph, which may not be that desirable.

Kostas

On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kk...@gmail.com> wrote:
>
> Hi all,
>
> I do not have a strong opinion on the topic yet, but I would like to
> share my thoughts on this.
>
> In the solution proposing a wrapping AtlasExecutor around the Flink
> Executors, if we allow the user to use the CLI to submit jobs, then
> this means that the CLI code may have to change so that it injects the
> executor option to AtlasExecutor (transparently to the user), and then
> the AtlasExecutor should take what the user has actually set as
> pipeline executor and find the adequate executor. If this is not done
> transparently, then the user should do sth explicit to point Flink to
> Atlas and then to the correct executor, which implies that we should
> add user-facing stuff (like cli options) to Flink.
>
> For the solution of adding getPipeline() to the JobListener, I think
> that from a design perspective, it does not fit in the listener
> itself. The listener is a "passive" entity that is expected to listen
> to specific "events". Adding a getter does not fit there. Other
> options for the getPipeline() method are:
> 1) add it as a method to the JobClient
> 2) add it as an argument to the methods of the JobListener (along with
> the JobClient and the throwable)
>
> Alternative 1) would currently work because the JobClient is only
> instantiated by the executor. But in the future, we may (and probably
> will because of implications of FLIP-85) allow a JobClient to get
> "attached" to a running job. In this case, the getPipeline() will not
> have a pipeline to return.
> Alternative 2) will break existing code, which I am not sure how
> important this is as the JobListener is a new feature and I guess some
> but not many users.
>
> As a sidenote, if I am not mistaken, apart from Yarn, none of the
> above solutions would work in per-job mode for Kuberneter, Mesos or
> with web-submissions. These modes go through "special" execution
> environments that use them simply to extract the JobGraph which then
> they submit to the cluster. In this case, there is no executor
> involved. Are these cases important to you?
>
> Finally, another solution, although more drastic and more involved,
> could be to have a "JobListener" running on the jobMaster. This will
> collect the relevant info and send them to Atlas. But I am not sure
> how Atlas works and if it requires the data to be extracted on the
> client side. I am saying this because the JobMasters may be running
> anywhere in a cluster while the clients may run on designated machines
> which can have specific configurations, e.g. open ports to communicate
> with a specific Atlas server.
>
> Cheers,
> Kostas
>
> On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi Gyula!
> >
> > My main motivation was to try and avoid mixing an internal interface
> > (Pipeline) with public API. It looks like this is trying to go "public
> > stable", but doesn't really do it exactly because of mixing "pipeline" into
> > this.
> > You would need to cast "Pipeline" and work on internal classes in the
> > implementation.
> >
> > If we use an "internal API" or a "semi-stable SPI" class, it looks at a
> > first glance a bit cleaner and more maintainable (opening up less surface)
> > to make the PipelineExecutor a "stable SPI".
> > I have not checked out all the details, though.
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com> wrote:
> >
> > > Hi Stephan!
> > >
> > > Thanks for checking this out. I agree that wrapping the other
> > > PipelineExecutors with a delegating AtlasExecutor would be a good
> > > alternative approach to implement this but I actually feel that it suffers
> > > even more problems than exposing the Pipeline instance in the JobListener.
> > >
> > > The main idea with the Atlas integration would be to have the Atlas hook
> > > logic in the Atlas project where it would be maintained. This means that
> > > any approach we take has to rely on public APIs. The JobListener is already
> > > a public evolving API while the PipelineExecutor and the factory is purely
> > > internal. Even if we make it public it will still expose the Pipeline so we
> > > did not gain much on the public/internal API front.
> > >
> > > I also feel that since the Atlas hook logic should only observe the
> > > pipeline and collect information the JobListener interface seems an ideal
> > > match and the implementation can be pretty lightweight. From a purely
> > > implementation perspective adding an Executor would be more heavy as it has
> > > to properly delegate to an other executor making sure that we don't break
> > > anything.
> > >
> > > Don't take me wrong, I am not opposed to reworking the implementations we
> > > have as it's very simple at this point but I also want to make sure that we
> > > take the approach that is simple from a maintainability standpoint. Of
> > > course my argument rests on the assumption that the AtlasHook itself will
> > > live outside of the Flink project, thats another question.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org> wrote:
> > >
> > > > Hi all!
> > > >
> > > > In general, nice idea to support this integration with Atlas.
> > > >
> > > > I think we could make this a bit easier/lightweight with a small change.
> > > > One of the issues that is not super nice is that this starts exposing the
> > > > (currently empty) Pipeline interface in the public API.
> > > > The Pipeline is an SPI interface that would be good to hide in the API.
> > > >
> > > > Since 1.10, Flink has the notion of Executors, which take the pipeline
> > > and
> > > > execute them. Meaning each pipeline is passed on anyways. And executors
> > > are
> > > > already configurable in the Flink configuration.
> > > > So, instead of passing the pipeline both "down" (to the executor) and "to
> > > > the side" (JobListener), could we just have a wrapping "AtlasExecutor"
> > > that
> > > > takes the pipeline, does whatever it wants, and then passes it to the
> > > > proper executor? This would also have the advantage that it supports
> > > making
> > > > changes to the pipeline, if needed in the future. For example, if there
> > > is
> > > > ever the need to add additional configuration fields, set properties, add
> > > > "labels" or so, this could be easily done in the suggested approach.
> > > >
> > > > I tried to sketch this in the picture below, pardon my bad drawing.
> > > >
> > > > [image: Listener_Executor.png]
> > > >
> > > >
> > > >
> > > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > >
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <al...@apache.org>
> > > > wrote:
> > > >
> > > >> Thanks! I'm reading the document now and will get back to you.
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >
> > >

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi all,

I do not have a strong opinion on the topic yet, but I would like to
share my thoughts on this.

In the solution proposing a wrapping AtlasExecutor around the Flink
Executors, if we allow the user to use the CLI to submit jobs, then
this means that the CLI code may have to change so that it injects the
executor option to AtlasExecutor (transparently to the user), and then
the AtlasExecutor should take what the user has actually set as
pipeline executor and find the adequate executor. If this is not done
transparently, then the user should do sth explicit to point Flink to
Atlas and then to the correct executor, which implies that we should
add user-facing stuff (like cli options) to Flink.

For the solution of adding getPipeline() to the JobListener, I think
that from a design perspective, it does not fit in the listener
itself. The listener is a "passive" entity that is expected to listen
to specific "events". Adding a getter does not fit there. Other
options for the getPipeline() method are:
1) add it as a method to the JobClient
2) add it as an argument to the methods of the JobListener (along with
the JobClient and the throwable)

Alternative 1) would currently work because the JobClient is only
instantiated by the executor. But in the future, we may (and probably
will because of implications of FLIP-85) allow a JobClient to get
"attached" to a running job. In this case, the getPipeline() will not
have a pipeline to return.
Alternative 2) will break existing code, which I am not sure how
important this is as the JobListener is a new feature and I guess some
but not many users.

As a sidenote, if I am not mistaken, apart from Yarn, none of the
above solutions would work in per-job mode for Kuberneter, Mesos or
with web-submissions. These modes go through "special" execution
environments that use them simply to extract the JobGraph which then
they submit to the cluster. In this case, there is no executor
involved. Are these cases important to you?

Finally, another solution, although more drastic and more involved,
could be to have a "JobListener" running on the jobMaster. This will
collect the relevant info and send them to Atlas. But I am not sure
how Atlas works and if it requires the data to be extracted on the
client side. I am saying this because the JobMasters may be running
anywhere in a cluster while the clients may run on designated machines
which can have specific configurations, e.g. open ports to communicate
with a specific Atlas server.

Cheers,
Kostas

On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> wrote:
>
> Hi Gyula!
>
> My main motivation was to try and avoid mixing an internal interface
> (Pipeline) with public API. It looks like this is trying to go "public
> stable", but doesn't really do it exactly because of mixing "pipeline" into
> this.
> You would need to cast "Pipeline" and work on internal classes in the
> implementation.
>
> If we use an "internal API" or a "semi-stable SPI" class, it looks at a
> first glance a bit cleaner and more maintainable (opening up less surface)
> to make the PipelineExecutor a "stable SPI".
> I have not checked out all the details, though.
>
> Best,
> Stephan
>
>
> On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com> wrote:
>
> > Hi Stephan!
> >
> > Thanks for checking this out. I agree that wrapping the other
> > PipelineExecutors with a delegating AtlasExecutor would be a good
> > alternative approach to implement this but I actually feel that it suffers
> > even more problems than exposing the Pipeline instance in the JobListener.
> >
> > The main idea with the Atlas integration would be to have the Atlas hook
> > logic in the Atlas project where it would be maintained. This means that
> > any approach we take has to rely on public APIs. The JobListener is already
> > a public evolving API while the PipelineExecutor and the factory is purely
> > internal. Even if we make it public it will still expose the Pipeline so we
> > did not gain much on the public/internal API front.
> >
> > I also feel that since the Atlas hook logic should only observe the
> > pipeline and collect information the JobListener interface seems an ideal
> > match and the implementation can be pretty lightweight. From a purely
> > implementation perspective adding an Executor would be more heavy as it has
> > to properly delegate to an other executor making sure that we don't break
> > anything.
> >
> > Don't take me wrong, I am not opposed to reworking the implementations we
> > have as it's very simple at this point but I also want to make sure that we
> > take the approach that is simple from a maintainability standpoint. Of
> > course my argument rests on the assumption that the AtlasHook itself will
> > live outside of the Flink project, thats another question.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org> wrote:
> >
> > > Hi all!
> > >
> > > In general, nice idea to support this integration with Atlas.
> > >
> > > I think we could make this a bit easier/lightweight with a small change.
> > > One of the issues that is not super nice is that this starts exposing the
> > > (currently empty) Pipeline interface in the public API.
> > > The Pipeline is an SPI interface that would be good to hide in the API.
> > >
> > > Since 1.10, Flink has the notion of Executors, which take the pipeline
> > and
> > > execute them. Meaning each pipeline is passed on anyways. And executors
> > are
> > > already configurable in the Flink configuration.
> > > So, instead of passing the pipeline both "down" (to the executor) and "to
> > > the side" (JobListener), could we just have a wrapping "AtlasExecutor"
> > that
> > > takes the pipeline, does whatever it wants, and then passes it to the
> > > proper executor? This would also have the advantage that it supports
> > making
> > > changes to the pipeline, if needed in the future. For example, if there
> > is
> > > ever the need to add additional configuration fields, set properties, add
> > > "labels" or so, this could be easily done in the suggested approach.
> > >
> > > I tried to sketch this in the picture below, pardon my bad drawing.
> > >
> > > [image: Listener_Executor.png]
> > >
> > >
> > >
> > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > >
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <al...@apache.org>
> > > wrote:
> > >
> > >> Thanks! I'm reading the document now and will get back to you.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >
> >

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Stephan Ewen <se...@apache.org>.
Hi Gyula!

My main motivation was to try and avoid mixing an internal interface
(Pipeline) with public API. It looks like this is trying to go "public
stable", but doesn't really do it exactly because of mixing "pipeline" into
this.
You would need to cast "Pipeline" and work on internal classes in the
implementation.

If we use an "internal API" or a "semi-stable SPI" class, it looks at a
first glance a bit cleaner and more maintainable (opening up less surface)
to make the PipelineExecutor a "stable SPI".
I have not checked out all the details, though.

Best,
Stephan


On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gy...@gmail.com> wrote:

> Hi Stephan!
>
> Thanks for checking this out. I agree that wrapping the other
> PipelineExecutors with a delegating AtlasExecutor would be a good
> alternative approach to implement this but I actually feel that it suffers
> even more problems than exposing the Pipeline instance in the JobListener.
>
> The main idea with the Atlas integration would be to have the Atlas hook
> logic in the Atlas project where it would be maintained. This means that
> any approach we take has to rely on public APIs. The JobListener is already
> a public evolving API while the PipelineExecutor and the factory is purely
> internal. Even if we make it public it will still expose the Pipeline so we
> did not gain much on the public/internal API front.
>
> I also feel that since the Atlas hook logic should only observe the
> pipeline and collect information the JobListener interface seems an ideal
> match and the implementation can be pretty lightweight. From a purely
> implementation perspective adding an Executor would be more heavy as it has
> to properly delegate to an other executor making sure that we don't break
> anything.
>
> Don't take me wrong, I am not opposed to reworking the implementations we
> have as it's very simple at this point but I also want to make sure that we
> take the approach that is simple from a maintainability standpoint. Of
> course my argument rests on the assumption that the AtlasHook itself will
> live outside of the Flink project, thats another question.
>
> Cheers,
> Gyula
>
> On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org> wrote:
>
> > Hi all!
> >
> > In general, nice idea to support this integration with Atlas.
> >
> > I think we could make this a bit easier/lightweight with a small change.
> > One of the issues that is not super nice is that this starts exposing the
> > (currently empty) Pipeline interface in the public API.
> > The Pipeline is an SPI interface that would be good to hide in the API.
> >
> > Since 1.10, Flink has the notion of Executors, which take the pipeline
> and
> > execute them. Meaning each pipeline is passed on anyways. And executors
> are
> > already configurable in the Flink configuration.
> > So, instead of passing the pipeline both "down" (to the executor) and "to
> > the side" (JobListener), could we just have a wrapping "AtlasExecutor"
> that
> > takes the pipeline, does whatever it wants, and then passes it to the
> > proper executor? This would also have the advantage that it supports
> making
> > changes to the pipeline, if needed in the future. For example, if there
> is
> > ever the need to add additional configuration fields, set properties, add
> > "labels" or so, this could be easily done in the suggested approach.
> >
> > I tried to sketch this in the picture below, pardon my bad drawing.
> >
> > [image: Listener_Executor.png]
> >
> >
> >
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> >
> >
> > Best,
> > Stephan
> >
> >
> > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Thanks! I'm reading the document now and will get back to you.
> >>
> >> Best,
> >> Aljoscha
> >>
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Stephan!

Thanks for checking this out. I agree that wrapping the other
PipelineExecutors with a delegating AtlasExecutor would be a good
alternative approach to implement this but I actually feel that it suffers
even more problems than exposing the Pipeline instance in the JobListener.

The main idea with the Atlas integration would be to have the Atlas hook
logic in the Atlas project where it would be maintained. This means that
any approach we take has to rely on public APIs. The JobListener is already
a public evolving API while the PipelineExecutor and the factory is purely
internal. Even if we make it public it will still expose the Pipeline so we
did not gain much on the public/internal API front.

I also feel that since the Atlas hook logic should only observe the
pipeline and collect information the JobListener interface seems an ideal
match and the implementation can be pretty lightweight. From a purely
implementation perspective adding an Executor would be more heavy as it has
to properly delegate to an other executor making sure that we don't break
anything.

Don't take me wrong, I am not opposed to reworking the implementations we
have as it's very simple at this point but I also want to make sure that we
take the approach that is simple from a maintainability standpoint. Of
course my argument rests on the assumption that the AtlasHook itself will
live outside of the Flink project, thats another question.

Cheers,
Gyula

On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <se...@apache.org> wrote:

> Hi all!
>
> In general, nice idea to support this integration with Atlas.
>
> I think we could make this a bit easier/lightweight with a small change.
> One of the issues that is not super nice is that this starts exposing the
> (currently empty) Pipeline interface in the public API.
> The Pipeline is an SPI interface that would be good to hide in the API.
>
> Since 1.10, Flink has the notion of Executors, which take the pipeline and
> execute them. Meaning each pipeline is passed on anyways. And executors are
> already configurable in the Flink configuration.
> So, instead of passing the pipeline both "down" (to the executor) and "to
> the side" (JobListener), could we just have a wrapping "AtlasExecutor" that
> takes the pipeline, does whatever it wants, and then passes it to the
> proper executor? This would also have the advantage that it supports making
> changes to the pipeline, if needed in the future. For example, if there is
> ever the need to add additional configuration fields, set properties, add
> "labels" or so, this could be easily done in the suggested approach.
>
> I tried to sketch this in the picture below, pardon my bad drawing.
>
> [image: Listener_Executor.png]
>
>
> https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
>
>
> Best,
> Stephan
>
>
> On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Thanks! I'm reading the document now and will get back to you.
>>
>> Best,
>> Aljoscha
>>
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Stephan Ewen <se...@apache.org>.
Hi all!

In general, nice idea to support this integration with Atlas.

I think we could make this a bit easier/lightweight with a small change.
One of the issues that is not super nice is that this starts exposing the
(currently empty) Pipeline interface in the public API.
The Pipeline is an SPI interface that would be good to hide in the API.

Since 1.10, Flink has the notion of Executors, which take the pipeline and
execute them. Meaning each pipeline is passed on anyways. And executors are
already configurable in the Flink configuration.
So, instead of passing the pipeline both "down" (to the executor) and "to
the side" (JobListener), could we just have a wrapping "AtlasExecutor" that
takes the pipeline, does whatever it wants, and then passes it to the
proper executor? This would also have the advantage that it supports making
changes to the pipeline, if needed in the future. For example, if there is
ever the need to add additional configuration fields, set properties, add
"labels" or so, this could be easily done in the suggested approach.

I tried to sketch this in the picture below, pardon my bad drawing.

[image: Listener_Executor.png]

https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1


Best,
Stephan


On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Thanks! I'm reading the document now and will get back to you.
>
> Best,
> Aljoscha
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks! I'm reading the document now and will get back to you.

Best,
Aljoscha

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Márton Balassi <mb...@apache.org>.
Hi all,

We have added the interface for registering the connectors in custom user
user defined functions, like representing enrichment from an HBase table in
the middle of a Flink application. We are reaching out to the Atlas
community to review the implementation in the near future too, based on
which we plan to open a pull request to Flink to add the minor changes
needed for the sources and sinks we plan to support out of the box as
described in the design document. Once these changes are merged we can add
the necessary functionality in Atlas too.

You can find the document here:
https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing

Best,
Marton

On Thu, Feb 20, 2020 at 10:38 AM Gyula Fóra <gy...@gmail.com> wrote:

> Hi all!
>
> Thank you for the patience!
>
> We have created a small design document for the change proposal detailing
> the minimal required changes in Flink for the initial version of the Atlas
> integration.
>
> You can find the document here:
>
> https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing
>
> It would be great if you could check it out and comment on it.
> If we agree on the next steps I will start opening JIRA-s and PRs with the
> proposed changes.
>
> The document links to an already working Atlas hook prototype (and
> accompanying flink fork). The links for that are also here:
> Flink: https://github.com/gyfora/flink/tree/atlas-changes
> Atlas: https://github.com/gyfora/atlas/tree/flink-bridge
>
> Thank you!
> Gyula
>
> On Thu, Feb 13, 2020 at 4:43 PM Gyula Fóra <gy...@gmail.com> wrote:
>
> > Thanks for the feedback Aljoscha!
> >
> > I have a POC ready with the Flink changes + the Atlas hook
> implementation.
> > I will try to push this to a public repo tomorrow and we can discuss
> > further based on that!
> >
> > Gyula
> >
> > On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> >> I think exposing the Pipeline should be ok. Using the internal
> >> StreamGraph might be problematic because this might change/break but
> >> that's a problem of the external code.
> >>
> >> Aljoscha
> >>
> >> On 11.02.20 16:26, Gyula Fóra wrote:
> >> > Hi All!
> >> >
> >> > I have made a prototype that simply adds a getPipeline() method to the
> >> > JobClient interface. Then I could easily implement the Atlas hook
> using
> >> the
> >> > JobListener interface. I simply check if Pipeline is instanceof
> >> StreamGraph
> >> > and do the logic there.
> >> >
> >> > I think this is so far the cleanest approach and I much prefer this
> >> > compared to working on the JobGraph directly which would expose even
> >> more
> >> > messy internals.
> >> >
> >> > Unfortunately this change alone is not enough for the integration as
> we
> >> > need to make sure that all Sources/Sinks that we want to integrate to
> >> atlas
> >> > publicly expose some of their properties:
> >> >
> >> >     - Kafka source/sink:
> >> >        - Kafka props
> >> >        - Topic(s) - this is tricky for sinks
> >> >     - FS source /sink:
> >> >        - Hadoop props
> >> >        - Base path for StreamingFileSink
> >> >        - Path for ContinuousMonitoringSource
> >> >
> >> > Most of these are straightforward changes, the only question is what
> we
> >> > want to register in Atlas from the available connectors. Ideally users
> >> > could also somehow register their own Atlas metadata for custom
> sources
> >> and
> >> > sinks, we could probably introduce an interface for that in Atlas.
> >> >
> >> > Cheers,
> >> > Gyula
> >> >
> >> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gy...@gmail.com>
> >> wrote:
> >> >
> >> >> Maybe we could improve the Pipeline interface in the long run, but
> as a
> >> >> temporary solution the JobClient could expose a getPipeline() method.
> >> >>
> >> >> That way the implementation of the JobListener could check if its a
> >> >> StreamGraph or a Plan.
> >> >>
> >> >> How bad does that sound?
> >> >>
> >> >> Gyula
> >> >>
> >> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hi Aljoscha!
> >> >>>
> >> >>> That's a valid concert but we should try to figure something out,
> many
> >> >>> users need this before they can use Flink.
> >> >>>
> >> >>> I think the closest thing we have right now is the StreamGraph. In
> >> >>> contrast with the JobGraph  the StreamGraph is pretty nice from a
> >> metadata
> >> >>> perspective :D
> >> >>> The big downside of exposing the StreamGraph is that we don't have
> it
> >> in
> >> >>> batch. On the other hand we could expose the JobGraph but then the
> >> >>> integration component would still have to do the heavy lifting for
> >> batch
> >> >>> and stream specific operators and UDFs.
> >> >>>
> >> >>> Instead of exposing either StreamGraph/JobGraph, we could come up
> >> with a
> >> >>> metadata like representation for the users but that would be like
> >> >>> implementing Atlas integration itself without Atlas dependencies :D
> >> >>>
> >> >>> As a comparison point, this is how it works in Storm:
> >> >>> Every operator (spout/bolt), stores a config map (string->string)
> with
> >> >>> all the metadata such as operator class, and the operator specific
> >> configs.
> >> >>> The Atlas hook works on this map.
> >> >>> This is very fragile and depends on a lot of internals. Kind of like
> >> >>> exposing the JobGraph but much worse. I think we can do better.
> >> >>>
> >> >>> Gyula
> >> >>>
> >> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <
> aljoscha@apache.org>
> >> >>> wrote:
> >> >>>
> >> >>>> If we need it, we can probably beef up the JobListener to allow
> >> >>>> accessing some information about the whole graph or sources and
> >> sinks.
> >> >>>> My only concern right now is that we don't have a stable interface
> >> for
> >> >>>> our job graphs/pipelines right now.
> >> >>>>
> >> >>>> Best,
> >> >>>> Aljoscha
> >> >>>>
> >> >>>> On 06.02.20 23:00, Gyula Fóra wrote:
> >> >>>>> Hi Jeff & Till!
> >> >>>>>
> >> >>>>> Thanks for the feedback, this is exactly the discussion I was
> >> looking
> >> >>>> for.
> >> >>>>> The JobListener looks very promising if we can expose the JobGraph
> >> >>>> somehow
> >> >>>>> (correct me if I am wrong but it is not accessible at the moment).
> >> >>>>>
> >> >>>>> I did not know about this feature that's why I added my
> >> JobSubmission
> >> >>>> hook
> >> >>>>> which was pretty similar but only exposing the JobGraph. In
> general
> >> I
> >> >>>> like
> >> >>>>> the listener better and I would not like to add anything extra if
> we
> >> >>>> can
> >> >>>>> avoid it.
> >> >>>>>
> >> >>>>> Actually the bigger part of the integration work that will need
> more
> >> >>>>> changes in Flink will be regarding the accessibility of
> >> sources/sinks
> >> >>>> from
> >> >>>>> the JobGraph and their specific properties. For instance at the
> >> moment
> >> >>>> the
> >> >>>>> Kafka sources and sinks do not expose anything publicly such as
> >> topics,
> >> >>>>> kafka configs, etc. Same goes for other data connectors that we
> >> need to
> >> >>>>> integrate in the long run. I guess there will be a separate thread
> >> on
> >> >>>> this
> >> >>>>> once we iron out the initial integration points :)
> >> >>>>>
> >> >>>>> I will try to play around with the JobListener interface tomorrow
> >> and
> >> >>>> see
> >> >>>>> if I can extend it to meet our needs.
> >> >>>>>
> >> >>>>> Cheers,
> >> >>>>> Gyula
> >> >>>>>
> >> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com>
> wrote:
> >> >>>>>
> >> >>>>>> Hi Gyula,
> >> >>>>>>
> >> >>>>>> Flink 1.10 introduced JobListener which is invoked after job
> >> >>>> submission and
> >> >>>>>> finished.  May we can add api on JobClient to get what info you
> >> >>>> needed for
> >> >>>>>> altas integration.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>
> >>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
> >> >>>>>>
> >> >>>>>>> Hi all!
> >> >>>>>>>
> >> >>>>>>> We have started some preliminary work on the Flink - Atlas
> >> >>>> integration at
> >> >>>>>>> Cloudera. It seems that the integration will require some new
> hook
> >> >>>>>>> interfaces at the jobgraph generation and submission phases, so
> I
> >> >>>>>> figured I
> >> >>>>>>> will open a discussion thread with my initial ideas to get some
> >> early
> >> >>>>>>> feedback.
> >> >>>>>>>
> >> >>>>>>> *Minimal background*
> >> >>>>>>> Very simply put Apache Atlas is a data governance framework that
> >> >>>> stores
> >> >>>>>>> metadata for our data and processing logic to track ownership,
> >> >>>> lineage
> >> >>>>>> etc.
> >> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and
> >> many
> >> >>>>>>> others.
> >> >>>>>>>
> >> >>>>>>> Adding Flink integration would mean that we can track the input
> >> >>>> output
> >> >>>>>> data
> >> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are
> >> >>>>>> connected
> >> >>>>>>> to each other through the data they produce (lineage). This
> seems
> >> to
> >> >>>> be a
> >> >>>>>>> very big deal for a lot of companies :)
> >> >>>>>>>
> >> >>>>>>> *Flink - Atlas integration in a nutshell*
> >> >>>>>>> In order to integrate with Atlas we basically need 2 things.
> >> >>>>>>>    - Flink entity definitions
> >> >>>>>>>    - Flink Atlas hook
> >> >>>>>>>
> >> >>>>>>> The entity definition is the easy part. It is a json that
> contains
> >> >>>> the
> >> >>>>>>> objects (entities) that we want to store for any give Flink job.
> >> As a
> >> >>>>>>> starter we could have a single FlinkApplication entity that has
> a
> >> >>>> set of
> >> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas
> entities
> >> >>>> that
> >> >>>>>> are
> >> >>>>>>> already defines such as Kafka topic or Hbase table.
> >> >>>>>>>
> >> >>>>>>> The Flink atlas hook will be the logic that creates the entity
> >> >>>> instance
> >> >>>>>> and
> >> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the
> >> part
> >> >>>> where
> >> >>>>>>> we implement the core logic.
> >> >>>>>>>
> >> >>>>>>> *Job submission hook*
> >> >>>>>>> In order to implement the Atlas hook we need a place where we
> can
> >> >>>> inspect
> >> >>>>>>> the pipeline, create and send the metadata when the job starts.
> >> When
> >> >>>> we
> >> >>>>>>> create the FlinkApplication entity we need to be able to easily
> >> >>>> determine
> >> >>>>>>> the sources and sinks (and their properties) of the pipeline.
> >> >>>>>>>
> >> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
> >> >>>> execute
> >> >>>>>>> this logic and even if there was one there is a mismatch of
> >> >>>> abstraction
> >> >>>>>>> levels needed to implement the integration.
> >> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager
> >> >>>> runner
> >> >>>>>> as
> >> >>>>>>> this:
> >> >>>>>>>
> >> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> >> >>>>>>> configuration);
> >> >>>>>>>
> >> >>>>>>> This is nice but the JobGraph makes it super difficult to
> extract
> >> >>>> sources
> >> >>>>>>> and UDFs to create the metadata entity. The atlas entity however
> >> >>>> could be
> >> >>>>>>> easily created from the StreamGraph object (used to represent
> the
> >> >>>> logical
> >> >>>>>>> flow) before the JobGraph is generated. To go around this
> >> limitation
> >> >>>> we
> >> >>>>>>> could add a JobGraphGeneratorHook interface:
> >> >>>>>>>
> >> >>>>>>> void preProcess(StreamGraph streamGraph); void
> >> postProcess(JobGraph
> >> >>>>>>> jobGraph);
> >> >>>>>>>
> >> >>>>>>> We could then generate the atlas entity in the preprocess step
> and
> >> >>>> add a
> >> >>>>>>> jobmission hook in the postprocess step that will simply send
> the
> >> >>>> already
> >> >>>>>>> baked in entity.
> >> >>>>>>>
> >> >>>>>>> *This kinda works but...*
> >> >>>>>>> The approach outlined above seems to work and we have built a
> POC
> >> >>>> using
> >> >>>>>> it.
> >> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs
> >> such
> >> >>>> as
> >> >>>>>> the
> >> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead
> of
> >> >>>> one.
> >> >>>>>>>
> >> >>>>>>> It would be much nicer if we could somehow go back from JobGraph
> >> to
> >> >>>>>>> StreamGraph or at least have an easy way to access source/sink
> >> UDFS.
> >> >>>>>>>
> >> >>>>>>> What do you think?
> >> >>>>>>>
> >> >>>>>>> Cheers,
> >> >>>>>>> Gyula
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> Best Regards
> >> >>>>>>
> >> >>>>>> Jeff Zhang
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >
> >>
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi all!

Thank you for the patience!

We have created a small design document for the change proposal detailing
the minimal required changes in Flink for the initial version of the Atlas
integration.

You can find the document here:
https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing

It would be great if you could check it out and comment on it.
If we agree on the next steps I will start opening JIRA-s and PRs with the
proposed changes.

The document links to an already working Atlas hook prototype (and
accompanying flink fork). The links for that are also here:
Flink: https://github.com/gyfora/flink/tree/atlas-changes
Atlas: https://github.com/gyfora/atlas/tree/flink-bridge

Thank you!
Gyula

On Thu, Feb 13, 2020 at 4:43 PM Gyula Fóra <gy...@gmail.com> wrote:

> Thanks for the feedback Aljoscha!
>
> I have a POC ready with the Flink changes + the Atlas hook implementation.
> I will try to push this to a public repo tomorrow and we can discuss
> further based on that!
>
> Gyula
>
> On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <al...@apache.org> wrote:
>
>> I think exposing the Pipeline should be ok. Using the internal
>> StreamGraph might be problematic because this might change/break but
>> that's a problem of the external code.
>>
>> Aljoscha
>>
>> On 11.02.20 16:26, Gyula Fóra wrote:
>> > Hi All!
>> >
>> > I have made a prototype that simply adds a getPipeline() method to the
>> > JobClient interface. Then I could easily implement the Atlas hook using
>> the
>> > JobListener interface. I simply check if Pipeline is instanceof
>> StreamGraph
>> > and do the logic there.
>> >
>> > I think this is so far the cleanest approach and I much prefer this
>> > compared to working on the JobGraph directly which would expose even
>> more
>> > messy internals.
>> >
>> > Unfortunately this change alone is not enough for the integration as we
>> > need to make sure that all Sources/Sinks that we want to integrate to
>> atlas
>> > publicly expose some of their properties:
>> >
>> >     - Kafka source/sink:
>> >        - Kafka props
>> >        - Topic(s) - this is tricky for sinks
>> >     - FS source /sink:
>> >        - Hadoop props
>> >        - Base path for StreamingFileSink
>> >        - Path for ContinuousMonitoringSource
>> >
>> > Most of these are straightforward changes, the only question is what we
>> > want to register in Atlas from the available connectors. Ideally users
>> > could also somehow register their own Atlas metadata for custom sources
>> and
>> > sinks, we could probably introduce an interface for that in Atlas.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gy...@gmail.com>
>> wrote:
>> >
>> >> Maybe we could improve the Pipeline interface in the long run, but as a
>> >> temporary solution the JobClient could expose a getPipeline() method.
>> >>
>> >> That way the implementation of the JobListener could check if its a
>> >> StreamGraph or a Plan.
>> >>
>> >> How bad does that sound?
>> >>
>> >> Gyula
>> >>
>> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Aljoscha!
>> >>>
>> >>> That's a valid concert but we should try to figure something out, many
>> >>> users need this before they can use Flink.
>> >>>
>> >>> I think the closest thing we have right now is the StreamGraph. In
>> >>> contrast with the JobGraph  the StreamGraph is pretty nice from a
>> metadata
>> >>> perspective :D
>> >>> The big downside of exposing the StreamGraph is that we don't have it
>> in
>> >>> batch. On the other hand we could expose the JobGraph but then the
>> >>> integration component would still have to do the heavy lifting for
>> batch
>> >>> and stream specific operators and UDFs.
>> >>>
>> >>> Instead of exposing either StreamGraph/JobGraph, we could come up
>> with a
>> >>> metadata like representation for the users but that would be like
>> >>> implementing Atlas integration itself without Atlas dependencies :D
>> >>>
>> >>> As a comparison point, this is how it works in Storm:
>> >>> Every operator (spout/bolt), stores a config map (string->string) with
>> >>> all the metadata such as operator class, and the operator specific
>> configs.
>> >>> The Atlas hook works on this map.
>> >>> This is very fragile and depends on a lot of internals. Kind of like
>> >>> exposing the JobGraph but much worse. I think we can do better.
>> >>>
>> >>> Gyula
>> >>>
>> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> If we need it, we can probably beef up the JobListener to allow
>> >>>> accessing some information about the whole graph or sources and
>> sinks.
>> >>>> My only concern right now is that we don't have a stable interface
>> for
>> >>>> our job graphs/pipelines right now.
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>> On 06.02.20 23:00, Gyula Fóra wrote:
>> >>>>> Hi Jeff & Till!
>> >>>>>
>> >>>>> Thanks for the feedback, this is exactly the discussion I was
>> looking
>> >>>> for.
>> >>>>> The JobListener looks very promising if we can expose the JobGraph
>> >>>> somehow
>> >>>>> (correct me if I am wrong but it is not accessible at the moment).
>> >>>>>
>> >>>>> I did not know about this feature that's why I added my
>> JobSubmission
>> >>>> hook
>> >>>>> which was pretty similar but only exposing the JobGraph. In general
>> I
>> >>>> like
>> >>>>> the listener better and I would not like to add anything extra if we
>> >>>> can
>> >>>>> avoid it.
>> >>>>>
>> >>>>> Actually the bigger part of the integration work that will need more
>> >>>>> changes in Flink will be regarding the accessibility of
>> sources/sinks
>> >>>> from
>> >>>>> the JobGraph and their specific properties. For instance at the
>> moment
>> >>>> the
>> >>>>> Kafka sources and sinks do not expose anything publicly such as
>> topics,
>> >>>>> kafka configs, etc. Same goes for other data connectors that we
>> need to
>> >>>>> integrate in the long run. I guess there will be a separate thread
>> on
>> >>>> this
>> >>>>> once we iron out the initial integration points :)
>> >>>>>
>> >>>>> I will try to play around with the JobListener interface tomorrow
>> and
>> >>>> see
>> >>>>> if I can extend it to meet our needs.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Gyula
>> >>>>>
>> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hi Gyula,
>> >>>>>>
>> >>>>>> Flink 1.10 introduced JobListener which is invoked after job
>> >>>> submission and
>> >>>>>> finished.  May we can add api on JobClient to get what info you
>> >>>> needed for
>> >>>>>> altas integration.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>> >>>>>>
>> >>>>>>
>> >>>>>> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>> >>>>>>
>> >>>>>>> Hi all!
>> >>>>>>>
>> >>>>>>> We have started some preliminary work on the Flink - Atlas
>> >>>> integration at
>> >>>>>>> Cloudera. It seems that the integration will require some new hook
>> >>>>>>> interfaces at the jobgraph generation and submission phases, so I
>> >>>>>> figured I
>> >>>>>>> will open a discussion thread with my initial ideas to get some
>> early
>> >>>>>>> feedback.
>> >>>>>>>
>> >>>>>>> *Minimal background*
>> >>>>>>> Very simply put Apache Atlas is a data governance framework that
>> >>>> stores
>> >>>>>>> metadata for our data and processing logic to track ownership,
>> >>>> lineage
>> >>>>>> etc.
>> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and
>> many
>> >>>>>>> others.
>> >>>>>>>
>> >>>>>>> Adding Flink integration would mean that we can track the input
>> >>>> output
>> >>>>>> data
>> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are
>> >>>>>> connected
>> >>>>>>> to each other through the data they produce (lineage). This seems
>> to
>> >>>> be a
>> >>>>>>> very big deal for a lot of companies :)
>> >>>>>>>
>> >>>>>>> *Flink - Atlas integration in a nutshell*
>> >>>>>>> In order to integrate with Atlas we basically need 2 things.
>> >>>>>>>    - Flink entity definitions
>> >>>>>>>    - Flink Atlas hook
>> >>>>>>>
>> >>>>>>> The entity definition is the easy part. It is a json that contains
>> >>>> the
>> >>>>>>> objects (entities) that we want to store for any give Flink job.
>> As a
>> >>>>>>> starter we could have a single FlinkApplication entity that has a
>> >>>> set of
>> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities
>> >>>> that
>> >>>>>> are
>> >>>>>>> already defines such as Kafka topic or Hbase table.
>> >>>>>>>
>> >>>>>>> The Flink atlas hook will be the logic that creates the entity
>> >>>> instance
>> >>>>>> and
>> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the
>> part
>> >>>> where
>> >>>>>>> we implement the core logic.
>> >>>>>>>
>> >>>>>>> *Job submission hook*
>> >>>>>>> In order to implement the Atlas hook we need a place where we can
>> >>>> inspect
>> >>>>>>> the pipeline, create and send the metadata when the job starts.
>> When
>> >>>> we
>> >>>>>>> create the FlinkApplication entity we need to be able to easily
>> >>>> determine
>> >>>>>>> the sources and sinks (and their properties) of the pipeline.
>> >>>>>>>
>> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
>> >>>> execute
>> >>>>>>> this logic and even if there was one there is a mismatch of
>> >>>> abstraction
>> >>>>>>> levels needed to implement the integration.
>> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager
>> >>>> runner
>> >>>>>> as
>> >>>>>>> this:
>> >>>>>>>
>> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>> >>>>>>> configuration);
>> >>>>>>>
>> >>>>>>> This is nice but the JobGraph makes it super difficult to extract
>> >>>> sources
>> >>>>>>> and UDFs to create the metadata entity. The atlas entity however
>> >>>> could be
>> >>>>>>> easily created from the StreamGraph object (used to represent the
>> >>>> logical
>> >>>>>>> flow) before the JobGraph is generated. To go around this
>> limitation
>> >>>> we
>> >>>>>>> could add a JobGraphGeneratorHook interface:
>> >>>>>>>
>> >>>>>>> void preProcess(StreamGraph streamGraph); void
>> postProcess(JobGraph
>> >>>>>>> jobGraph);
>> >>>>>>>
>> >>>>>>> We could then generate the atlas entity in the preprocess step and
>> >>>> add a
>> >>>>>>> jobmission hook in the postprocess step that will simply send the
>> >>>> already
>> >>>>>>> baked in entity.
>> >>>>>>>
>> >>>>>>> *This kinda works but...*
>> >>>>>>> The approach outlined above seems to work and we have built a POC
>> >>>> using
>> >>>>>> it.
>> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs
>> such
>> >>>> as
>> >>>>>> the
>> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
>> >>>> one.
>> >>>>>>>
>> >>>>>>> It would be much nicer if we could somehow go back from JobGraph
>> to
>> >>>>>>> StreamGraph or at least have an easy way to access source/sink
>> UDFS.
>> >>>>>>>
>> >>>>>>> What do you think?
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Gyula
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> Best Regards
>> >>>>>>
>> >>>>>> Jeff Zhang
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >
>>
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks for the feedback Aljoscha!

I have a POC ready with the Flink changes + the Atlas hook implementation.
I will try to push this to a public repo tomorrow and we can discuss
further based on that!

Gyula

On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <al...@apache.org> wrote:

> I think exposing the Pipeline should be ok. Using the internal
> StreamGraph might be problematic because this might change/break but
> that's a problem of the external code.
>
> Aljoscha
>
> On 11.02.20 16:26, Gyula Fóra wrote:
> > Hi All!
> >
> > I have made a prototype that simply adds a getPipeline() method to the
> > JobClient interface. Then I could easily implement the Atlas hook using
> the
> > JobListener interface. I simply check if Pipeline is instanceof
> StreamGraph
> > and do the logic there.
> >
> > I think this is so far the cleanest approach and I much prefer this
> > compared to working on the JobGraph directly which would expose even more
> > messy internals.
> >
> > Unfortunately this change alone is not enough for the integration as we
> > need to make sure that all Sources/Sinks that we want to integrate to
> atlas
> > publicly expose some of their properties:
> >
> >     - Kafka source/sink:
> >        - Kafka props
> >        - Topic(s) - this is tricky for sinks
> >     - FS source /sink:
> >        - Hadoop props
> >        - Base path for StreamingFileSink
> >        - Path for ContinuousMonitoringSource
> >
> > Most of these are straightforward changes, the only question is what we
> > want to register in Atlas from the available connectors. Ideally users
> > could also somehow register their own Atlas metadata for custom sources
> and
> > sinks, we could probably introduce an interface for that in Atlas.
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gy...@gmail.com> wrote:
> >
> >> Maybe we could improve the Pipeline interface in the long run, but as a
> >> temporary solution the JobClient could expose a getPipeline() method.
> >>
> >> That way the implementation of the JobListener could check if its a
> >> StreamGraph or a Plan.
> >>
> >> How bad does that sound?
> >>
> >> Gyula
> >>
> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com>
> wrote:
> >>
> >>> Hi Aljoscha!
> >>>
> >>> That's a valid concert but we should try to figure something out, many
> >>> users need this before they can use Flink.
> >>>
> >>> I think the closest thing we have right now is the StreamGraph. In
> >>> contrast with the JobGraph  the StreamGraph is pretty nice from a
> metadata
> >>> perspective :D
> >>> The big downside of exposing the StreamGraph is that we don't have it
> in
> >>> batch. On the other hand we could expose the JobGraph but then the
> >>> integration component would still have to do the heavy lifting for
> batch
> >>> and stream specific operators and UDFs.
> >>>
> >>> Instead of exposing either StreamGraph/JobGraph, we could come up with
> a
> >>> metadata like representation for the users but that would be like
> >>> implementing Atlas integration itself without Atlas dependencies :D
> >>>
> >>> As a comparison point, this is how it works in Storm:
> >>> Every operator (spout/bolt), stores a config map (string->string) with
> >>> all the metadata such as operator class, and the operator specific
> configs.
> >>> The Atlas hook works on this map.
> >>> This is very fragile and depends on a lot of internals. Kind of like
> >>> exposing the JobGraph but much worse. I think we can do better.
> >>>
> >>> Gyula
> >>>
> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org>
> >>> wrote:
> >>>
> >>>> If we need it, we can probably beef up the JobListener to allow
> >>>> accessing some information about the whole graph or sources and sinks.
> >>>> My only concern right now is that we don't have a stable interface for
> >>>> our job graphs/pipelines right now.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> On 06.02.20 23:00, Gyula Fóra wrote:
> >>>>> Hi Jeff & Till!
> >>>>>
> >>>>> Thanks for the feedback, this is exactly the discussion I was looking
> >>>> for.
> >>>>> The JobListener looks very promising if we can expose the JobGraph
> >>>> somehow
> >>>>> (correct me if I am wrong but it is not accessible at the moment).
> >>>>>
> >>>>> I did not know about this feature that's why I added my JobSubmission
> >>>> hook
> >>>>> which was pretty similar but only exposing the JobGraph. In general I
> >>>> like
> >>>>> the listener better and I would not like to add anything extra if we
> >>>> can
> >>>>> avoid it.
> >>>>>
> >>>>> Actually the bigger part of the integration work that will need more
> >>>>> changes in Flink will be regarding the accessibility of sources/sinks
> >>>> from
> >>>>> the JobGraph and their specific properties. For instance at the
> moment
> >>>> the
> >>>>> Kafka sources and sinks do not expose anything publicly such as
> topics,
> >>>>> kafka configs, etc. Same goes for other data connectors that we need
> to
> >>>>> integrate in the long run. I guess there will be a separate thread on
> >>>> this
> >>>>> once we iron out the initial integration points :)
> >>>>>
> >>>>> I will try to play around with the JobListener interface tomorrow and
> >>>> see
> >>>>> if I can extend it to meet our needs.
> >>>>>
> >>>>> Cheers,
> >>>>> Gyula
> >>>>>
> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Gyula,
> >>>>>>
> >>>>>> Flink 1.10 introduced JobListener which is invoked after job
> >>>> submission and
> >>>>>> finished.  May we can add api on JobClient to get what info you
> >>>> needed for
> >>>>>> altas integration.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
> >>>>>>
> >>>>>>
> >>>>>> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
> >>>>>>
> >>>>>>> Hi all!
> >>>>>>>
> >>>>>>> We have started some preliminary work on the Flink - Atlas
> >>>> integration at
> >>>>>>> Cloudera. It seems that the integration will require some new hook
> >>>>>>> interfaces at the jobgraph generation and submission phases, so I
> >>>>>> figured I
> >>>>>>> will open a discussion thread with my initial ideas to get some
> early
> >>>>>>> feedback.
> >>>>>>>
> >>>>>>> *Minimal background*
> >>>>>>> Very simply put Apache Atlas is a data governance framework that
> >>>> stores
> >>>>>>> metadata for our data and processing logic to track ownership,
> >>>> lineage
> >>>>>> etc.
> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and
> many
> >>>>>>> others.
> >>>>>>>
> >>>>>>> Adding Flink integration would mean that we can track the input
> >>>> output
> >>>>>> data
> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are
> >>>>>> connected
> >>>>>>> to each other through the data they produce (lineage). This seems
> to
> >>>> be a
> >>>>>>> very big deal for a lot of companies :)
> >>>>>>>
> >>>>>>> *Flink - Atlas integration in a nutshell*
> >>>>>>> In order to integrate with Atlas we basically need 2 things.
> >>>>>>>    - Flink entity definitions
> >>>>>>>    - Flink Atlas hook
> >>>>>>>
> >>>>>>> The entity definition is the easy part. It is a json that contains
> >>>> the
> >>>>>>> objects (entities) that we want to store for any give Flink job.
> As a
> >>>>>>> starter we could have a single FlinkApplication entity that has a
> >>>> set of
> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities
> >>>> that
> >>>>>> are
> >>>>>>> already defines such as Kafka topic or Hbase table.
> >>>>>>>
> >>>>>>> The Flink atlas hook will be the logic that creates the entity
> >>>> instance
> >>>>>> and
> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the part
> >>>> where
> >>>>>>> we implement the core logic.
> >>>>>>>
> >>>>>>> *Job submission hook*
> >>>>>>> In order to implement the Atlas hook we need a place where we can
> >>>> inspect
> >>>>>>> the pipeline, create and send the metadata when the job starts.
> When
> >>>> we
> >>>>>>> create the FlinkApplication entity we need to be able to easily
> >>>> determine
> >>>>>>> the sources and sinks (and their properties) of the pipeline.
> >>>>>>>
> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
> >>>> execute
> >>>>>>> this logic and even if there was one there is a mismatch of
> >>>> abstraction
> >>>>>>> levels needed to implement the integration.
> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager
> >>>> runner
> >>>>>> as
> >>>>>>> this:
> >>>>>>>
> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> >>>>>>> configuration);
> >>>>>>>
> >>>>>>> This is nice but the JobGraph makes it super difficult to extract
> >>>> sources
> >>>>>>> and UDFs to create the metadata entity. The atlas entity however
> >>>> could be
> >>>>>>> easily created from the StreamGraph object (used to represent the
> >>>> logical
> >>>>>>> flow) before the JobGraph is generated. To go around this
> limitation
> >>>> we
> >>>>>>> could add a JobGraphGeneratorHook interface:
> >>>>>>>
> >>>>>>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> >>>>>>> jobGraph);
> >>>>>>>
> >>>>>>> We could then generate the atlas entity in the preprocess step and
> >>>> add a
> >>>>>>> jobmission hook in the postprocess step that will simply send the
> >>>> already
> >>>>>>> baked in entity.
> >>>>>>>
> >>>>>>> *This kinda works but...*
> >>>>>>> The approach outlined above seems to work and we have built a POC
> >>>> using
> >>>>>> it.
> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs
> such
> >>>> as
> >>>>>> the
> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
> >>>> one.
> >>>>>>>
> >>>>>>> It would be much nicer if we could somehow go back from JobGraph to
> >>>>>>> StreamGraph or at least have an easy way to access source/sink
> UDFS.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Gyula
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best Regards
> >>>>>>
> >>>>>> Jeff Zhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Aljoscha Krettek <al...@apache.org>.
I think exposing the Pipeline should be ok. Using the internal 
StreamGraph might be problematic because this might change/break but 
that's a problem of the external code.

Aljoscha

On 11.02.20 16:26, Gyula Fóra wrote:
> Hi All!
> 
> I have made a prototype that simply adds a getPipeline() method to the
> JobClient interface. Then I could easily implement the Atlas hook using the
> JobListener interface. I simply check if Pipeline is instanceof StreamGraph
> and do the logic there.
> 
> I think this is so far the cleanest approach and I much prefer this
> compared to working on the JobGraph directly which would expose even more
> messy internals.
> 
> Unfortunately this change alone is not enough for the integration as we
> need to make sure that all Sources/Sinks that we want to integrate to atlas
> publicly expose some of their properties:
> 
>     - Kafka source/sink:
>        - Kafka props
>        - Topic(s) - this is tricky for sinks
>     - FS source /sink:
>        - Hadoop props
>        - Base path for StreamingFileSink
>        - Path for ContinuousMonitoringSource
> 
> Most of these are straightforward changes, the only question is what we
> want to register in Atlas from the available connectors. Ideally users
> could also somehow register their own Atlas metadata for custom sources and
> sinks, we could probably introduce an interface for that in Atlas.
> 
> Cheers,
> Gyula
> 
> On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gy...@gmail.com> wrote:
> 
>> Maybe we could improve the Pipeline interface in the long run, but as a
>> temporary solution the JobClient could expose a getPipeline() method.
>>
>> That way the implementation of the JobListener could check if its a
>> StreamGraph or a Plan.
>>
>> How bad does that sound?
>>
>> Gyula
>>
>> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Hi Aljoscha!
>>>
>>> That's a valid concert but we should try to figure something out, many
>>> users need this before they can use Flink.
>>>
>>> I think the closest thing we have right now is the StreamGraph. In
>>> contrast with the JobGraph  the StreamGraph is pretty nice from a metadata
>>> perspective :D
>>> The big downside of exposing the StreamGraph is that we don't have it in
>>> batch. On the other hand we could expose the JobGraph but then the
>>> integration component would still have to do the heavy lifting for batch
>>> and stream specific operators and UDFs.
>>>
>>> Instead of exposing either StreamGraph/JobGraph, we could come up with a
>>> metadata like representation for the users but that would be like
>>> implementing Atlas integration itself without Atlas dependencies :D
>>>
>>> As a comparison point, this is how it works in Storm:
>>> Every operator (spout/bolt), stores a config map (string->string) with
>>> all the metadata such as operator class, and the operator specific configs.
>>> The Atlas hook works on this map.
>>> This is very fragile and depends on a lot of internals. Kind of like
>>> exposing the JobGraph but much worse. I think we can do better.
>>>
>>> Gyula
>>>
>>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> If we need it, we can probably beef up the JobListener to allow
>>>> accessing some information about the whole graph or sources and sinks.
>>>> My only concern right now is that we don't have a stable interface for
>>>> our job graphs/pipelines right now.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 06.02.20 23:00, Gyula Fóra wrote:
>>>>> Hi Jeff & Till!
>>>>>
>>>>> Thanks for the feedback, this is exactly the discussion I was looking
>>>> for.
>>>>> The JobListener looks very promising if we can expose the JobGraph
>>>> somehow
>>>>> (correct me if I am wrong but it is not accessible at the moment).
>>>>>
>>>>> I did not know about this feature that's why I added my JobSubmission
>>>> hook
>>>>> which was pretty similar but only exposing the JobGraph. In general I
>>>> like
>>>>> the listener better and I would not like to add anything extra if we
>>>> can
>>>>> avoid it.
>>>>>
>>>>> Actually the bigger part of the integration work that will need more
>>>>> changes in Flink will be regarding the accessibility of sources/sinks
>>>> from
>>>>> the JobGraph and their specific properties. For instance at the moment
>>>> the
>>>>> Kafka sources and sinks do not expose anything publicly such as topics,
>>>>> kafka configs, etc. Same goes for other data connectors that we need to
>>>>> integrate in the long run. I guess there will be a separate thread on
>>>> this
>>>>> once we iron out the initial integration points :)
>>>>>
>>>>> I will try to play around with the JobListener interface tomorrow and
>>>> see
>>>>> if I can extend it to meet our needs.
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> Flink 1.10 introduced JobListener which is invoked after job
>>>> submission and
>>>>>> finished.  May we can add api on JobClient to get what info you
>>>> needed for
>>>>>> altas integration.
>>>>>>
>>>>>>
>>>>>>
>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>>>>>>
>>>>>>
>>>>>> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>>>>>>
>>>>>>> Hi all!
>>>>>>>
>>>>>>> We have started some preliminary work on the Flink - Atlas
>>>> integration at
>>>>>>> Cloudera. It seems that the integration will require some new hook
>>>>>>> interfaces at the jobgraph generation and submission phases, so I
>>>>>> figured I
>>>>>>> will open a discussion thread with my initial ideas to get some early
>>>>>>> feedback.
>>>>>>>
>>>>>>> *Minimal background*
>>>>>>> Very simply put Apache Atlas is a data governance framework that
>>>> stores
>>>>>>> metadata for our data and processing logic to track ownership,
>>>> lineage
>>>>>> etc.
>>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and many
>>>>>>> others.
>>>>>>>
>>>>>>> Adding Flink integration would mean that we can track the input
>>>> output
>>>>>> data
>>>>>>> of our Flink jobs, their owners and how different Flink jobs are
>>>>>> connected
>>>>>>> to each other through the data they produce (lineage). This seems to
>>>> be a
>>>>>>> very big deal for a lot of companies :)
>>>>>>>
>>>>>>> *Flink - Atlas integration in a nutshell*
>>>>>>> In order to integrate with Atlas we basically need 2 things.
>>>>>>>    - Flink entity definitions
>>>>>>>    - Flink Atlas hook
>>>>>>>
>>>>>>> The entity definition is the easy part. It is a json that contains
>>>> the
>>>>>>> objects (entities) that we want to store for any give Flink job. As a
>>>>>>> starter we could have a single FlinkApplication entity that has a
>>>> set of
>>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities
>>>> that
>>>>>> are
>>>>>>> already defines such as Kafka topic or Hbase table.
>>>>>>>
>>>>>>> The Flink atlas hook will be the logic that creates the entity
>>>> instance
>>>>>> and
>>>>>>> uploads it to Atlas when we start a new Flink job. This is the part
>>>> where
>>>>>>> we implement the core logic.
>>>>>>>
>>>>>>> *Job submission hook*
>>>>>>> In order to implement the Atlas hook we need a place where we can
>>>> inspect
>>>>>>> the pipeline, create and send the metadata when the job starts. When
>>>> we
>>>>>>> create the FlinkApplication entity we need to be able to easily
>>>> determine
>>>>>>> the sources and sinks (and their properties) of the pipeline.
>>>>>>>
>>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
>>>> execute
>>>>>>> this logic and even if there was one there is a mismatch of
>>>> abstraction
>>>>>>> levels needed to implement the integration.
>>>>>>> We could imagine a JobSubmission hook executed in the JobManager
>>>> runner
>>>>>> as
>>>>>>> this:
>>>>>>>
>>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>>>>>>> configuration);
>>>>>>>
>>>>>>> This is nice but the JobGraph makes it super difficult to extract
>>>> sources
>>>>>>> and UDFs to create the metadata entity. The atlas entity however
>>>> could be
>>>>>>> easily created from the StreamGraph object (used to represent the
>>>> logical
>>>>>>> flow) before the JobGraph is generated. To go around this limitation
>>>> we
>>>>>>> could add a JobGraphGeneratorHook interface:
>>>>>>>
>>>>>>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
>>>>>>> jobGraph);
>>>>>>>
>>>>>>> We could then generate the atlas entity in the preprocess step and
>>>> add a
>>>>>>> jobmission hook in the postprocess step that will simply send the
>>>> already
>>>>>>> baked in entity.
>>>>>>>
>>>>>>> *This kinda works but...*
>>>>>>> The approach outlined above seems to work and we have built a POC
>>>> using
>>>>>> it.
>>>>>>> Unfortunately it is far from nice as it exposes non-public APIs such
>>>> as
>>>>>> the
>>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
>>>> one.
>>>>>>>
>>>>>>> It would be much nicer if we could somehow go back from JobGraph to
>>>>>>> StreamGraph or at least have an easy way to access source/sink UDFS.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>
>>>
> 

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi All!

I have made a prototype that simply adds a getPipeline() method to the
JobClient interface. Then I could easily implement the Atlas hook using the
JobListener interface. I simply check if Pipeline is instanceof StreamGraph
and do the logic there.

I think this is so far the cleanest approach and I much prefer this
compared to working on the JobGraph directly which would expose even more
messy internals.

Unfortunately this change alone is not enough for the integration as we
need to make sure that all Sources/Sinks that we want to integrate to atlas
publicly expose some of their properties:

   - Kafka source/sink:
      - Kafka props
      - Topic(s) - this is tricky for sinks
   - FS source /sink:
      - Hadoop props
      - Base path for StreamingFileSink
      - Path for ContinuousMonitoringSource

Most of these are straightforward changes, the only question is what we
want to register in Atlas from the available connectors. Ideally users
could also somehow register their own Atlas metadata for custom sources and
sinks, we could probably introduce an interface for that in Atlas.

Cheers,
Gyula

On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gy...@gmail.com> wrote:

> Maybe we could improve the Pipeline interface in the long run, but as a
> temporary solution the JobClient could expose a getPipeline() method.
>
> That way the implementation of the JobListener could check if its a
> StreamGraph or a Plan.
>
> How bad does that sound?
>
> Gyula
>
> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hi Aljoscha!
>>
>> That's a valid concert but we should try to figure something out, many
>> users need this before they can use Flink.
>>
>> I think the closest thing we have right now is the StreamGraph. In
>> contrast with the JobGraph  the StreamGraph is pretty nice from a metadata
>> perspective :D
>> The big downside of exposing the StreamGraph is that we don't have it in
>> batch. On the other hand we could expose the JobGraph but then the
>> integration component would still have to do the heavy lifting for batch
>> and stream specific operators and UDFs.
>>
>> Instead of exposing either StreamGraph/JobGraph, we could come up with a
>> metadata like representation for the users but that would be like
>> implementing Atlas integration itself without Atlas dependencies :D
>>
>> As a comparison point, this is how it works in Storm:
>> Every operator (spout/bolt), stores a config map (string->string) with
>> all the metadata such as operator class, and the operator specific configs.
>> The Atlas hook works on this map.
>> This is very fragile and depends on a lot of internals. Kind of like
>> exposing the JobGraph but much worse. I think we can do better.
>>
>> Gyula
>>
>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> If we need it, we can probably beef up the JobListener to allow
>>> accessing some information about the whole graph or sources and sinks.
>>> My only concern right now is that we don't have a stable interface for
>>> our job graphs/pipelines right now.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.02.20 23:00, Gyula Fóra wrote:
>>> > Hi Jeff & Till!
>>> >
>>> > Thanks for the feedback, this is exactly the discussion I was looking
>>> for.
>>> > The JobListener looks very promising if we can expose the JobGraph
>>> somehow
>>> > (correct me if I am wrong but it is not accessible at the moment).
>>> >
>>> > I did not know about this feature that's why I added my JobSubmission
>>> hook
>>> > which was pretty similar but only exposing the JobGraph. In general I
>>> like
>>> > the listener better and I would not like to add anything extra if we
>>> can
>>> > avoid it.
>>> >
>>> > Actually the bigger part of the integration work that will need more
>>> > changes in Flink will be regarding the accessibility of sources/sinks
>>> from
>>> > the JobGraph and their specific properties. For instance at the moment
>>> the
>>> > Kafka sources and sinks do not expose anything publicly such as topics,
>>> > kafka configs, etc. Same goes for other data connectors that we need to
>>> > integrate in the long run. I guess there will be a separate thread on
>>> this
>>> > once we iron out the initial integration points :)
>>> >
>>> > I will try to play around with the JobListener interface tomorrow and
>>> see
>>> > if I can extend it to meet our needs.
>>> >
>>> > Cheers,
>>> > Gyula
>>> >
>>> > On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
>>> >
>>> >> Hi Gyula,
>>> >>
>>> >> Flink 1.10 introduced JobListener which is invoked after job
>>> submission and
>>> >> finished.  May we can add api on JobClient to get what info you
>>> needed for
>>> >> altas integration.
>>> >>
>>> >>
>>> >>
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>>> >>
>>> >>
>>> >> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>>> >>
>>> >>> Hi all!
>>> >>>
>>> >>> We have started some preliminary work on the Flink - Atlas
>>> integration at
>>> >>> Cloudera. It seems that the integration will require some new hook
>>> >>> interfaces at the jobgraph generation and submission phases, so I
>>> >> figured I
>>> >>> will open a discussion thread with my initial ideas to get some early
>>> >>> feedback.
>>> >>>
>>> >>> *Minimal background*
>>> >>> Very simply put Apache Atlas is a data governance framework that
>>> stores
>>> >>> metadata for our data and processing logic to track ownership,
>>> lineage
>>> >> etc.
>>> >>> It is already integrated with systems like HDFS, Kafka, Hive and many
>>> >>> others.
>>> >>>
>>> >>> Adding Flink integration would mean that we can track the input
>>> output
>>> >> data
>>> >>> of our Flink jobs, their owners and how different Flink jobs are
>>> >> connected
>>> >>> to each other through the data they produce (lineage). This seems to
>>> be a
>>> >>> very big deal for a lot of companies :)
>>> >>>
>>> >>> *Flink - Atlas integration in a nutshell*
>>> >>> In order to integrate with Atlas we basically need 2 things.
>>> >>>   - Flink entity definitions
>>> >>>   - Flink Atlas hook
>>> >>>
>>> >>> The entity definition is the easy part. It is a json that contains
>>> the
>>> >>> objects (entities) that we want to store for any give Flink job. As a
>>> >>> starter we could have a single FlinkApplication entity that has a
>>> set of
>>> >>> inputs and outputs. These inputs/outputs are other Atlas entities
>>> that
>>> >> are
>>> >>> already defines such as Kafka topic or Hbase table.
>>> >>>
>>> >>> The Flink atlas hook will be the logic that creates the entity
>>> instance
>>> >> and
>>> >>> uploads it to Atlas when we start a new Flink job. This is the part
>>> where
>>> >>> we implement the core logic.
>>> >>>
>>> >>> *Job submission hook*
>>> >>> In order to implement the Atlas hook we need a place where we can
>>> inspect
>>> >>> the pipeline, create and send the metadata when the job starts. When
>>> we
>>> >>> create the FlinkApplication entity we need to be able to easily
>>> determine
>>> >>> the sources and sinks (and their properties) of the pipeline.
>>> >>>
>>> >>> Unfortunately there is no JobSubmission hook in Flink that could
>>> execute
>>> >>> this logic and even if there was one there is a mismatch of
>>> abstraction
>>> >>> levels needed to implement the integration.
>>> >>> We could imagine a JobSubmission hook executed in the JobManager
>>> runner
>>> >> as
>>> >>> this:
>>> >>>
>>> >>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>>> >>> configuration);
>>> >>>
>>> >>> This is nice but the JobGraph makes it super difficult to extract
>>> sources
>>> >>> and UDFs to create the metadata entity. The atlas entity however
>>> could be
>>> >>> easily created from the StreamGraph object (used to represent the
>>> logical
>>> >>> flow) before the JobGraph is generated. To go around this limitation
>>> we
>>> >>> could add a JobGraphGeneratorHook interface:
>>> >>>
>>> >>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
>>> >>> jobGraph);
>>> >>>
>>> >>> We could then generate the atlas entity in the preprocess step and
>>> add a
>>> >>> jobmission hook in the postprocess step that will simply send the
>>> already
>>> >>> baked in entity.
>>> >>>
>>> >>> *This kinda works but...*
>>> >>> The approach outlined above seems to work and we have built a POC
>>> using
>>> >> it.
>>> >>> Unfortunately it is far from nice as it exposes non-public APIs such
>>> as
>>> >> the
>>> >>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
>>> one.
>>> >>>
>>> >>> It would be much nicer if we could somehow go back from JobGraph to
>>> >>> StreamGraph or at least have an easy way to access source/sink UDFS.
>>> >>>
>>> >>> What do you think?
>>> >>>
>>> >>> Cheers,
>>> >>> Gyula
>>> >>>
>>> >>
>>> >>
>>> >> --
>>> >> Best Regards
>>> >>
>>> >> Jeff Zhang
>>> >>
>>> >
>>>
>>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Maybe we could improve the Pipeline interface in the long run, but as a
temporary solution the JobClient could expose a getPipeline() method.

That way the implementation of the JobListener could check if its a
StreamGraph or a Plan.

How bad does that sound?

Gyula

On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gy...@gmail.com> wrote:

> Hi Aljoscha!
>
> That's a valid concert but we should try to figure something out, many
> users need this before they can use Flink.
>
> I think the closest thing we have right now is the StreamGraph. In
> contrast with the JobGraph  the StreamGraph is pretty nice from a metadata
> perspective :D
> The big downside of exposing the StreamGraph is that we don't have it in
> batch. On the other hand we could expose the JobGraph but then the
> integration component would still have to do the heavy lifting for batch
> and stream specific operators and UDFs.
>
> Instead of exposing either StreamGraph/JobGraph, we could come up with a
> metadata like representation for the users but that would be like
> implementing Atlas integration itself without Atlas dependencies :D
>
> As a comparison point, this is how it works in Storm:
> Every operator (spout/bolt), stores a config map (string->string) with all
> the metadata such as operator class, and the operator specific configs. The
> Atlas hook works on this map.
> This is very fragile and depends on a lot of internals. Kind of like
> exposing the JobGraph but much worse. I think we can do better.
>
> Gyula
>
> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> If we need it, we can probably beef up the JobListener to allow
>> accessing some information about the whole graph or sources and sinks.
>> My only concern right now is that we don't have a stable interface for
>> our job graphs/pipelines right now.
>>
>> Best,
>> Aljoscha
>>
>> On 06.02.20 23:00, Gyula Fóra wrote:
>> > Hi Jeff & Till!
>> >
>> > Thanks for the feedback, this is exactly the discussion I was looking
>> for.
>> > The JobListener looks very promising if we can expose the JobGraph
>> somehow
>> > (correct me if I am wrong but it is not accessible at the moment).
>> >
>> > I did not know about this feature that's why I added my JobSubmission
>> hook
>> > which was pretty similar but only exposing the JobGraph. In general I
>> like
>> > the listener better and I would not like to add anything extra if we can
>> > avoid it.
>> >
>> > Actually the bigger part of the integration work that will need more
>> > changes in Flink will be regarding the accessibility of sources/sinks
>> from
>> > the JobGraph and their specific properties. For instance at the moment
>> the
>> > Kafka sources and sinks do not expose anything publicly such as topics,
>> > kafka configs, etc. Same goes for other data connectors that we need to
>> > integrate in the long run. I guess there will be a separate thread on
>> this
>> > once we iron out the initial integration points :)
>> >
>> > I will try to play around with the JobListener interface tomorrow and
>> see
>> > if I can extend it to meet our needs.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
>> >
>> >> Hi Gyula,
>> >>
>> >> Flink 1.10 introduced JobListener which is invoked after job
>> submission and
>> >> finished.  May we can add api on JobClient to get what info you needed
>> for
>> >> altas integration.
>> >>
>> >>
>> >>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>> >>
>> >>
>> >> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>> >>
>> >>> Hi all!
>> >>>
>> >>> We have started some preliminary work on the Flink - Atlas
>> integration at
>> >>> Cloudera. It seems that the integration will require some new hook
>> >>> interfaces at the jobgraph generation and submission phases, so I
>> >> figured I
>> >>> will open a discussion thread with my initial ideas to get some early
>> >>> feedback.
>> >>>
>> >>> *Minimal background*
>> >>> Very simply put Apache Atlas is a data governance framework that
>> stores
>> >>> metadata for our data and processing logic to track ownership, lineage
>> >> etc.
>> >>> It is already integrated with systems like HDFS, Kafka, Hive and many
>> >>> others.
>> >>>
>> >>> Adding Flink integration would mean that we can track the input output
>> >> data
>> >>> of our Flink jobs, their owners and how different Flink jobs are
>> >> connected
>> >>> to each other through the data they produce (lineage). This seems to
>> be a
>> >>> very big deal for a lot of companies :)
>> >>>
>> >>> *Flink - Atlas integration in a nutshell*
>> >>> In order to integrate with Atlas we basically need 2 things.
>> >>>   - Flink entity definitions
>> >>>   - Flink Atlas hook
>> >>>
>> >>> The entity definition is the easy part. It is a json that contains the
>> >>> objects (entities) that we want to store for any give Flink job. As a
>> >>> starter we could have a single FlinkApplication entity that has a set
>> of
>> >>> inputs and outputs. These inputs/outputs are other Atlas entities that
>> >> are
>> >>> already defines such as Kafka topic or Hbase table.
>> >>>
>> >>> The Flink atlas hook will be the logic that creates the entity
>> instance
>> >> and
>> >>> uploads it to Atlas when we start a new Flink job. This is the part
>> where
>> >>> we implement the core logic.
>> >>>
>> >>> *Job submission hook*
>> >>> In order to implement the Atlas hook we need a place where we can
>> inspect
>> >>> the pipeline, create and send the metadata when the job starts. When
>> we
>> >>> create the FlinkApplication entity we need to be able to easily
>> determine
>> >>> the sources and sinks (and their properties) of the pipeline.
>> >>>
>> >>> Unfortunately there is no JobSubmission hook in Flink that could
>> execute
>> >>> this logic and even if there was one there is a mismatch of
>> abstraction
>> >>> levels needed to implement the integration.
>> >>> We could imagine a JobSubmission hook executed in the JobManager
>> runner
>> >> as
>> >>> this:
>> >>>
>> >>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>> >>> configuration);
>> >>>
>> >>> This is nice but the JobGraph makes it super difficult to extract
>> sources
>> >>> and UDFs to create the metadata entity. The atlas entity however
>> could be
>> >>> easily created from the StreamGraph object (used to represent the
>> logical
>> >>> flow) before the JobGraph is generated. To go around this limitation
>> we
>> >>> could add a JobGraphGeneratorHook interface:
>> >>>
>> >>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
>> >>> jobGraph);
>> >>>
>> >>> We could then generate the atlas entity in the preprocess step and
>> add a
>> >>> jobmission hook in the postprocess step that will simply send the
>> already
>> >>> baked in entity.
>> >>>
>> >>> *This kinda works but...*
>> >>> The approach outlined above seems to work and we have built a POC
>> using
>> >> it.
>> >>> Unfortunately it is far from nice as it exposes non-public APIs such
>> as
>> >> the
>> >>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>> >>>
>> >>> It would be much nicer if we could somehow go back from JobGraph to
>> >>> StreamGraph or at least have an easy way to access source/sink UDFS.
>> >>>
>> >>> What do you think?
>> >>>
>> >>> Cheers,
>> >>> Gyula
>> >>>
>> >>
>> >>
>> >> --
>> >> Best Regards
>> >>
>> >> Jeff Zhang
>> >>
>> >
>>
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Aljoscha!

That's a valid concert but we should try to figure something out, many
users need this before they can use Flink.

I think the closest thing we have right now is the StreamGraph. In contrast
with the JobGraph  the StreamGraph is pretty nice from a metadata
perspective :D
The big downside of exposing the StreamGraph is that we don't have it in
batch. On the other hand we could expose the JobGraph but then the
integration component would still have to do the heavy lifting for batch
and stream specific operators and UDFs.

Instead of exposing either StreamGraph/JobGraph, we could come up with a
metadata like representation for the users but that would be like
implementing Atlas integration itself without Atlas dependencies :D

As a comparison point, this is how it works in Storm:
Every operator (spout/bolt), stores a config map (string->string) with all
the metadata such as operator class, and the operator specific configs. The
Atlas hook works on this map.
This is very fragile and depends on a lot of internals. Kind of like
exposing the JobGraph but much worse. I think we can do better.

Gyula

On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <al...@apache.org> wrote:

> If we need it, we can probably beef up the JobListener to allow
> accessing some information about the whole graph or sources and sinks.
> My only concern right now is that we don't have a stable interface for
> our job graphs/pipelines right now.
>
> Best,
> Aljoscha
>
> On 06.02.20 23:00, Gyula Fóra wrote:
> > Hi Jeff & Till!
> >
> > Thanks for the feedback, this is exactly the discussion I was looking
> for.
> > The JobListener looks very promising if we can expose the JobGraph
> somehow
> > (correct me if I am wrong but it is not accessible at the moment).
> >
> > I did not know about this feature that's why I added my JobSubmission
> hook
> > which was pretty similar but only exposing the JobGraph. In general I
> like
> > the listener better and I would not like to add anything extra if we can
> > avoid it.
> >
> > Actually the bigger part of the integration work that will need more
> > changes in Flink will be regarding the accessibility of sources/sinks
> from
> > the JobGraph and their specific properties. For instance at the moment
> the
> > Kafka sources and sinks do not expose anything publicly such as topics,
> > kafka configs, etc. Same goes for other data connectors that we need to
> > integrate in the long run. I guess there will be a separate thread on
> this
> > once we iron out the initial integration points :)
> >
> > I will try to play around with the JobListener interface tomorrow and see
> > if I can extend it to meet our needs.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
> >
> >> Hi Gyula,
> >>
> >> Flink 1.10 introduced JobListener which is invoked after job submission
> and
> >> finished.  May we can add api on JobClient to get what info you needed
> for
> >> altas integration.
> >>
> >>
> >>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
> >>
> >>
> >> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
> >>
> >>> Hi all!
> >>>
> >>> We have started some preliminary work on the Flink - Atlas integration
> at
> >>> Cloudera. It seems that the integration will require some new hook
> >>> interfaces at the jobgraph generation and submission phases, so I
> >> figured I
> >>> will open a discussion thread with my initial ideas to get some early
> >>> feedback.
> >>>
> >>> *Minimal background*
> >>> Very simply put Apache Atlas is a data governance framework that stores
> >>> metadata for our data and processing logic to track ownership, lineage
> >> etc.
> >>> It is already integrated with systems like HDFS, Kafka, Hive and many
> >>> others.
> >>>
> >>> Adding Flink integration would mean that we can track the input output
> >> data
> >>> of our Flink jobs, their owners and how different Flink jobs are
> >> connected
> >>> to each other through the data they produce (lineage). This seems to
> be a
> >>> very big deal for a lot of companies :)
> >>>
> >>> *Flink - Atlas integration in a nutshell*
> >>> In order to integrate with Atlas we basically need 2 things.
> >>>   - Flink entity definitions
> >>>   - Flink Atlas hook
> >>>
> >>> The entity definition is the easy part. It is a json that contains the
> >>> objects (entities) that we want to store for any give Flink job. As a
> >>> starter we could have a single FlinkApplication entity that has a set
> of
> >>> inputs and outputs. These inputs/outputs are other Atlas entities that
> >> are
> >>> already defines such as Kafka topic or Hbase table.
> >>>
> >>> The Flink atlas hook will be the logic that creates the entity instance
> >> and
> >>> uploads it to Atlas when we start a new Flink job. This is the part
> where
> >>> we implement the core logic.
> >>>
> >>> *Job submission hook*
> >>> In order to implement the Atlas hook we need a place where we can
> inspect
> >>> the pipeline, create and send the metadata when the job starts. When we
> >>> create the FlinkApplication entity we need to be able to easily
> determine
> >>> the sources and sinks (and their properties) of the pipeline.
> >>>
> >>> Unfortunately there is no JobSubmission hook in Flink that could
> execute
> >>> this logic and even if there was one there is a mismatch of abstraction
> >>> levels needed to implement the integration.
> >>> We could imagine a JobSubmission hook executed in the JobManager runner
> >> as
> >>> this:
> >>>
> >>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> >>> configuration);
> >>>
> >>> This is nice but the JobGraph makes it super difficult to extract
> sources
> >>> and UDFs to create the metadata entity. The atlas entity however could
> be
> >>> easily created from the StreamGraph object (used to represent the
> logical
> >>> flow) before the JobGraph is generated. To go around this limitation we
> >>> could add a JobGraphGeneratorHook interface:
> >>>
> >>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> >>> jobGraph);
> >>>
> >>> We could then generate the atlas entity in the preprocess step and add
> a
> >>> jobmission hook in the postprocess step that will simply send the
> already
> >>> baked in entity.
> >>>
> >>> *This kinda works but...*
> >>> The approach outlined above seems to work and we have built a POC using
> >> it.
> >>> Unfortunately it is far from nice as it exposes non-public APIs such as
> >> the
> >>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> >>>
> >>> It would be much nicer if we could somehow go back from JobGraph to
> >>> StreamGraph or at least have an easy way to access source/sink UDFS.
> >>>
> >>> What do you think?
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>
> >>
> >> --
> >> Best Regards
> >>
> >> Jeff Zhang
> >>
> >
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Aljoscha Krettek <al...@apache.org>.
If we need it, we can probably beef up the JobListener to allow 
accessing some information about the whole graph or sources and sinks. 
My only concern right now is that we don't have a stable interface for 
our job graphs/pipelines right now.

Best,
Aljoscha

On 06.02.20 23:00, Gyula Fóra wrote:
> Hi Jeff & Till!
> 
> Thanks for the feedback, this is exactly the discussion I was looking for.
> The JobListener looks very promising if we can expose the JobGraph somehow
> (correct me if I am wrong but it is not accessible at the moment).
> 
> I did not know about this feature that's why I added my JobSubmission hook
> which was pretty similar but only exposing the JobGraph. In general I like
> the listener better and I would not like to add anything extra if we can
> avoid it.
> 
> Actually the bigger part of the integration work that will need more
> changes in Flink will be regarding the accessibility of sources/sinks from
> the JobGraph and their specific properties. For instance at the moment the
> Kafka sources and sinks do not expose anything publicly such as topics,
> kafka configs, etc. Same goes for other data connectors that we need to
> integrate in the long run. I guess there will be a separate thread on this
> once we iron out the initial integration points :)
> 
> I will try to play around with the JobListener interface tomorrow and see
> if I can extend it to meet our needs.
> 
> Cheers,
> Gyula
> 
> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:
> 
>> Hi Gyula,
>>
>> Flink 1.10 introduced JobListener which is invoked after job submission and
>> finished.  May we can add api on JobClient to get what info you needed for
>> altas integration.
>>
>>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>>
>>
>> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>>
>>> Hi all!
>>>
>>> We have started some preliminary work on the Flink - Atlas integration at
>>> Cloudera. It seems that the integration will require some new hook
>>> interfaces at the jobgraph generation and submission phases, so I
>> figured I
>>> will open a discussion thread with my initial ideas to get some early
>>> feedback.
>>>
>>> *Minimal background*
>>> Very simply put Apache Atlas is a data governance framework that stores
>>> metadata for our data and processing logic to track ownership, lineage
>> etc.
>>> It is already integrated with systems like HDFS, Kafka, Hive and many
>>> others.
>>>
>>> Adding Flink integration would mean that we can track the input output
>> data
>>> of our Flink jobs, their owners and how different Flink jobs are
>> connected
>>> to each other through the data they produce (lineage). This seems to be a
>>> very big deal for a lot of companies :)
>>>
>>> *Flink - Atlas integration in a nutshell*
>>> In order to integrate with Atlas we basically need 2 things.
>>>   - Flink entity definitions
>>>   - Flink Atlas hook
>>>
>>> The entity definition is the easy part. It is a json that contains the
>>> objects (entities) that we want to store for any give Flink job. As a
>>> starter we could have a single FlinkApplication entity that has a set of
>>> inputs and outputs. These inputs/outputs are other Atlas entities that
>> are
>>> already defines such as Kafka topic or Hbase table.
>>>
>>> The Flink atlas hook will be the logic that creates the entity instance
>> and
>>> uploads it to Atlas when we start a new Flink job. This is the part where
>>> we implement the core logic.
>>>
>>> *Job submission hook*
>>> In order to implement the Atlas hook we need a place where we can inspect
>>> the pipeline, create and send the metadata when the job starts. When we
>>> create the FlinkApplication entity we need to be able to easily determine
>>> the sources and sinks (and their properties) of the pipeline.
>>>
>>> Unfortunately there is no JobSubmission hook in Flink that could execute
>>> this logic and even if there was one there is a mismatch of abstraction
>>> levels needed to implement the integration.
>>> We could imagine a JobSubmission hook executed in the JobManager runner
>> as
>>> this:
>>>
>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>>> configuration);
>>>
>>> This is nice but the JobGraph makes it super difficult to extract sources
>>> and UDFs to create the metadata entity. The atlas entity however could be
>>> easily created from the StreamGraph object (used to represent the logical
>>> flow) before the JobGraph is generated. To go around this limitation we
>>> could add a JobGraphGeneratorHook interface:
>>>
>>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
>>> jobGraph);
>>>
>>> We could then generate the atlas entity in the preprocess step and add a
>>> jobmission hook in the postprocess step that will simply send the already
>>> baked in entity.
>>>
>>> *This kinda works but...*
>>> The approach outlined above seems to work and we have built a POC using
>> it.
>>> Unfortunately it is far from nice as it exposes non-public APIs such as
>> the
>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>>>
>>> It would be much nicer if we could somehow go back from JobGraph to
>>> StreamGraph or at least have an easy way to access source/sink UDFS.
>>>
>>> What do you think?
>>>
>>> Cheers,
>>> Gyula
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
> 

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Jeff & Till!

Thanks for the feedback, this is exactly the discussion I was looking for.
The JobListener looks very promising if we can expose the JobGraph somehow
(correct me if I am wrong but it is not accessible at the moment).

I did not know about this feature that's why I added my JobSubmission hook
which was pretty similar but only exposing the JobGraph. In general I like
the listener better and I would not like to add anything extra if we can
avoid it.

Actually the bigger part of the integration work that will need more
changes in Flink will be regarding the accessibility of sources/sinks from
the JobGraph and their specific properties. For instance at the moment the
Kafka sources and sinks do not expose anything publicly such as topics,
kafka configs, etc. Same goes for other data connectors that we need to
integrate in the long run. I guess there will be a separate thread on this
once we iron out the initial integration points :)

I will try to play around with the JobListener interface tomorrow and see
if I can extend it to meet our needs.

Cheers,
Gyula

On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zj...@gmail.com> wrote:

> Hi Gyula,
>
> Flink 1.10 introduced JobListener which is invoked after job submission and
> finished.  May we can add api on JobClient to get what info you needed for
> altas integration.
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>
>
> Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:
>
> > Hi all!
> >
> > We have started some preliminary work on the Flink - Atlas integration at
> > Cloudera. It seems that the integration will require some new hook
> > interfaces at the jobgraph generation and submission phases, so I
> figured I
> > will open a discussion thread with my initial ideas to get some early
> > feedback.
> >
> > *Minimal background*
> > Very simply put Apache Atlas is a data governance framework that stores
> > metadata for our data and processing logic to track ownership, lineage
> etc.
> > It is already integrated with systems like HDFS, Kafka, Hive and many
> > others.
> >
> > Adding Flink integration would mean that we can track the input output
> data
> > of our Flink jobs, their owners and how different Flink jobs are
> connected
> > to each other through the data they produce (lineage). This seems to be a
> > very big deal for a lot of companies :)
> >
> > *Flink - Atlas integration in a nutshell*
> > In order to integrate with Atlas we basically need 2 things.
> >  - Flink entity definitions
> >  - Flink Atlas hook
> >
> > The entity definition is the easy part. It is a json that contains the
> > objects (entities) that we want to store for any give Flink job. As a
> > starter we could have a single FlinkApplication entity that has a set of
> > inputs and outputs. These inputs/outputs are other Atlas entities that
> are
> > already defines such as Kafka topic or Hbase table.
> >
> > The Flink atlas hook will be the logic that creates the entity instance
> and
> > uploads it to Atlas when we start a new Flink job. This is the part where
> > we implement the core logic.
> >
> > *Job submission hook*
> > In order to implement the Atlas hook we need a place where we can inspect
> > the pipeline, create and send the metadata when the job starts. When we
> > create the FlinkApplication entity we need to be able to easily determine
> > the sources and sinks (and their properties) of the pipeline.
> >
> > Unfortunately there is no JobSubmission hook in Flink that could execute
> > this logic and even if there was one there is a mismatch of abstraction
> > levels needed to implement the integration.
> > We could imagine a JobSubmission hook executed in the JobManager runner
> as
> > this:
> >
> > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > configuration);
> >
> > This is nice but the JobGraph makes it super difficult to extract sources
> > and UDFs to create the metadata entity. The atlas entity however could be
> > easily created from the StreamGraph object (used to represent the logical
> > flow) before the JobGraph is generated. To go around this limitation we
> > could add a JobGraphGeneratorHook interface:
> >
> > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > jobGraph);
> >
> > We could then generate the atlas entity in the preprocess step and add a
> > jobmission hook in the postprocess step that will simply send the already
> > baked in entity.
> >
> > *This kinda works but...*
> > The approach outlined above seems to work and we have built a POC using
> it.
> > Unfortunately it is far from nice as it exposes non-public APIs such as
> the
> > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> >
> > It would be much nicer if we could somehow go back from JobGraph to
> > StreamGraph or at least have an easy way to access source/sink UDFS.
> >
> > What do you think?
> >
> > Cheers,
> > Gyula
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Jeff Zhang <zj...@gmail.com>.
Hi Gyula,

Flink 1.10 introduced JobListener which is invoked after job submission and
finished.  May we can add api on JobClient to get what info you needed for
altas integration.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46


Gyula Fóra <gy...@apache.org> 于2020年2月5日周三 下午7:48写道:

> Hi all!
>
> We have started some preliminary work on the Flink - Atlas integration at
> Cloudera. It seems that the integration will require some new hook
> interfaces at the jobgraph generation and submission phases, so I figured I
> will open a discussion thread with my initial ideas to get some early
> feedback.
>
> *Minimal background*
> Very simply put Apache Atlas is a data governance framework that stores
> metadata for our data and processing logic to track ownership, lineage etc.
> It is already integrated with systems like HDFS, Kafka, Hive and many
> others.
>
> Adding Flink integration would mean that we can track the input output data
> of our Flink jobs, their owners and how different Flink jobs are connected
> to each other through the data they produce (lineage). This seems to be a
> very big deal for a lot of companies :)
>
> *Flink - Atlas integration in a nutshell*
> In order to integrate with Atlas we basically need 2 things.
>  - Flink entity definitions
>  - Flink Atlas hook
>
> The entity definition is the easy part. It is a json that contains the
> objects (entities) that we want to store for any give Flink job. As a
> starter we could have a single FlinkApplication entity that has a set of
> inputs and outputs. These inputs/outputs are other Atlas entities that are
> already defines such as Kafka topic or Hbase table.
>
> The Flink atlas hook will be the logic that creates the entity instance and
> uploads it to Atlas when we start a new Flink job. This is the part where
> we implement the core logic.
>
> *Job submission hook*
> In order to implement the Atlas hook we need a place where we can inspect
> the pipeline, create and send the metadata when the job starts. When we
> create the FlinkApplication entity we need to be able to easily determine
> the sources and sinks (and their properties) of the pipeline.
>
> Unfortunately there is no JobSubmission hook in Flink that could execute
> this logic and even if there was one there is a mismatch of abstraction
> levels needed to implement the integration.
> We could imagine a JobSubmission hook executed in the JobManager runner as
> this:
>
> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> configuration);
>
> This is nice but the JobGraph makes it super difficult to extract sources
> and UDFs to create the metadata entity. The atlas entity however could be
> easily created from the StreamGraph object (used to represent the logical
> flow) before the JobGraph is generated. To go around this limitation we
> could add a JobGraphGeneratorHook interface:
>
> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> jobGraph);
>
> We could then generate the atlas entity in the preprocess step and add a
> jobmission hook in the postprocess step that will simply send the already
> baked in entity.
>
> *This kinda works but...*
> The approach outlined above seems to work and we have built a POC using it.
> Unfortunately it is far from nice as it exposes non-public APIs such as the
> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>
> It would be much nicer if we could somehow go back from JobGraph to
> StreamGraph or at least have an easy way to access source/sink UDFS.
>
> What do you think?
>
> Cheers,
> Gyula
>


-- 
Best Regards

Jeff Zhang

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Jacky Lau <li...@gmail.com>.
thanks Gyula Fóra. i have read it. And i think it is lark of flink catalog
info, which you can see spark atlas project here
https://github.com/hortonworks-spark/spark-atlas-connector



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Jack!

You can find the document here:
https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing

The document links to an already working Atlas hook prototype (and
accompanying flink fork). The links for that are also here:
Flink: https://github.com/gyfora/flink/tree/atlas-changes
Atlas: https://github.com/gyfora/atlas/tree/flink-bridge

We need to adapt this according to the above discussion this was an early
prototype.
Gyula

On Fri, Mar 27, 2020 at 11:07 AM jackylau <li...@gmail.com> wrote:

> Hi Márton Balassi:
>    I am very glad to look at it and where to find .
>    And it is my issue , which you can see
> https://issues.apache.org/jira/browse/FLINK-16774
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by jackylau <li...@gmail.com>.
Hi Márton Balassi:
   I am very glad to look at it and where to find .
   And it is my issue , which you can see
https://issues.apache.org/jira/browse/FLINK-16774



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by Márton Balassi <ba...@gmail.com>.
Hi Jack,

Yes, we know how to do it and even have the implementation ready and being
reviewed by the Atlas community at the moment. :-)
Would you be interested in having a look?

On Thu, Mar 19, 2020 at 12:56 PM jackylau <li...@gmail.com> wrote:

> Hi:
>   i think flink integrate atlas also need add catalog information such as
> spark atlas project
> .https://github.com/hortonworks-spark/spark-atlas-connector
>  when user use catalog such as JDBCCatalog/HiveCatalog, flink atlas project
> will sync this information to atlas.
>   But i don't find any Event Interface for flink to implement it as
> spark-atlas-connector does. Does anyone know how to do it
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Re: [Discussion] Job generation / submission hooks & Atlas integration

Posted by jackylau <li...@gmail.com>.
Hi:
  i think flink integrate atlas also need add catalog information such as
spark atlas project
.https://github.com/hortonworks-spark/spark-atlas-connector
 when user use catalog such as JDBCCatalog/HiveCatalog, flink atlas project
will sync this information to atlas.
  But i don't find any Event Interface for flink to implement it as
spark-atlas-connector does. Does anyone know how to do it



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/