You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shammon FY <zj...@gmail.com> on 2023/06/06 12:22:47 UTC

[DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Hi devs,

I would like to start a discussion on FLIP-314: Support Customized Job
Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
streaming and batch jobs create lineage dependency between source and sink,
users can manage their data and jobs according to this lineage information.
For example, when there is a delay in Flink ETL or data, users can easily
trace the problematic jobs and affected data. On the other hand, when users
need to correct data or debug, they can perform operations based on lineage
too.

In FLIP-314 we want to introduce lineage related interfaces for Flink and
users can create customized job status listeners. When job status changes,
users can get job status and information to add, update or delete lineage.

Looking forward to your feedback, thanks.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener

Best,
Shammon FY

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi Feng,

Thanks for your input.

>1. we can add a lineage interface like `supportReportLineage`

It's a so good idea and thanks very much. It can help users to report
lineage for existing connectors in DataStream jobs without any additional
operations. I will give this interface in the FLIP later and please help to
review it, thanks

>2. it is relatively easy to obtain column lineage through Calcite
MetaQuery API

It's helpful if Calcite already has some column lineage in meta, I think we
can discuss and give the proposal in the column lineage FLIP

Best,
Shammon FY



On Wednesday, June 28, 2023, Feng Jin <ji...@gmail.com> wrote:

> Hi Shammon
> Thank you for proposing this FLIP. I think the Flink Job lineage is a very
> useful feature.
> I have few question:
>
> 1. For DataStream Jobs, users need to set up lineage relationships when
> building DAGs for their custom sources and sinks.
> However, for some common connectors such as Kafka Connector and JDBC
> Connector, we can add a lineage interface like `supportReportLineage`, so
> that these connectors can implement it.
> This way, in the scenario of DataStream Jobs, lineages can be automatically
> reported. What do you think?
>
>
> 2. From the current design, it seems that we need to analyze column lineage
> through pipeline. As far as I know, it is relatively easy to obtain column
> lineage through Calcite MetaQuery API.
> Would you consider using this approach? Or do we need to implement another
> parsing process based on the pipeline?
> ```
> RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery();
> metadataQuery.getColumnOrigins(inputRel, i);
> ```
> Best,
> Feng
>
>
> On Sun, Jun 25, 2023 at 8:06 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi yuxia and Yun,
> >
> > Thanks for your input.
> >
> > For yuxia:
> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
> >
> > At present, we only need to notify the listener when a job goes to
> > termination, but I think it makes sense to add generic `oldStatus` and
> > `newStatus` in the listener and users can update the job state in their
> > service as needed.
> >
> > > 2: I'm really confused about the `config()` included in
> `LineageEntity`,
> > where is it from and what is it for ?
> >
> > The `config` in `LineageEntity` is used for users to get options for
> source
> > and sink connectors. As the examples in the FLIP, users can add
> > server/group/topic information in the config for kafka and create lineage
> > entities for `DataStream` jobs, then the listeners can get this
> information
> > to identify the same connector in different jobs. Otherwise, the `config`
> > in `TableLineageEntity` will be the same as `getOptions` in
> > `CatalogBaseTable`.
> >
> > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
> is
> > needed or not, since `TableSinkLineageEntity` contains
> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > changelogmode?
> >
> > At present, we do not actually use the changelog mode. It can be deleted,
> > and I have updated FLIP.
> >
> > > Btw, since there're a lot interfaces proposed, I think it'll be better
> to
> > give an example about how to implement a listener in this FLIP to make us
> > know better about the interfaces.
> >
> > I have added the example in the FLIP and the related interfaces and
> > examples are in branch [1].
> >
> > For Yun:
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP does not touch them, and will them become part of the
> > List<LineageEntity> sources() or adding another interface?
> >
> > You're right, currently lookup join dim tables were not considered in the
> > 'proposed changed' section of this FLIP. But the interface for lineage is
> > universal and we can give `TableLookupSourceLineageEntity` which
> implements
> > `TableSourceLineageEntity` in the future without modifying the public
> > interface.
> >
> > > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> > The lineage information in SQL jobs includes table lineage and column
> > lineage. Although SQL jobs currently do not support column lineage, we
> > would like to support this in the next step. So we have comprehensively
> > considered the table lineage and column lineage interfaces here, and
> > defined these two interfaces together clearly
> >
> >
> > [1]
> >
> > https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b1
> 1e82c9187c
> >
> > Best,
> > Shammon FY
> >
> >
> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
> >
> > > Hi Shammon,
> > >
> > > I like the idea in general and it will help to analysis the job
> lineages
> > > no matter FlinkSQL or Flink jar jobs in production environments.
> > >
> > > For Qingsheng's concern, I'd like the name of JobType more than
> > > RuntimeExecutionMode, as the latter one is not easy to understand for
> > users.
> > >
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP
> > > does not touch them, and will them become part of the
> List<LineageEntity>
> > > sources()​ or adding another interface?
> > >
> > > By the way, if you want to focus on job lineage instead of data column
> > > lineage in this FLIP, why we must introduce so many column-lineage
> > related
> > > interface here?
> > >
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Shammon FY <zj...@gmail.com>
> > > Sent: Sunday, June 25, 2023 16:13
> > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> Listener
> > >
> > > Hi Qingsheng,
> > >
> > > Thanks for your valuable feedback.
> > >
> > > > 1. Is there any specific use case to expose the batch / streaming
> info
> > to
> > > listeners or meta services?
> > >
> > > I agree with you that Flink is evolving towards batch-streaming
> > > unification, but the lifecycle of them is different. If a job
> processes a
> > > bound dataset, it will end after completing the data processing,
> > otherwise,
> > > it will run for a long time. In our scenario, we will regularly
> schedule
> > > some Flink jobs to process bound dataset and update some job
> information
> > to
> > > the lineage information for the "batch" jobs such as scheduled
> timestamp,
> > > execution duration when jobs are finished, which is different from
> > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > > `existsUnboundedSource` in `StreamingGraph` and
> `StreamingGraphGenerator`
> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
> > do
> > > you think of it?
> > >
> > > > 2. it’s better to be more specific here to tell users what
> information
> > > they could expect to see here, instead of just a “job configuration” as
> > > described in JavaDoc.
> > >
> > > Thanks and I have updated the doc in FLIP.
> > >
> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
> > >
> > > I have updated the docs for io executor  in
> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
> > and
> > > executes submitted tasks in parallel. Users can submit tasks to the
> > > executor which ensures that the submitted task can be executed before
> the
> > > job exits.
> > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > LineageEntity.
> > >
> > > In the initial idea, the `LineageRelationEntity` is used for
> `DataStream`
> > > to set additional lineage information besides source. For example,
> there
> > > are table and column lineages in SQL jobs. When we build a `DataStream`
> > job
> > > with table source and sink, we can add table lineage in the following
> > > method.
> > > ```
> > > public class DataStreamSink {
> > >     public DataStreamSink setLineageSources(LineageEntity ...
> sources);
> > > }
> > > ```
> > > But we can not set column lineage for the above sink, and for the sake
> of
> > > universality, we do not want to add a method similar to
> `addLineageColumn
> > > (...)` in `DataStreamSink`. So I put this information into
> > > LineageRelationEntity so that SQL and DataStream jobs can be
> consistent.
> > > But as you mentioned, this approach does indeed lead to ambiguity and
> > > complexity. So my current idea is to add the `setLineageRelation`
> method
> > in
> > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> updated
> > > the FLIP and please help to review it again, thanks.
> > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > and Flink, which appears in the TableLineageEntity.
> > >
> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > (the “override” is quite confusing). I’m wondering if these are
> necessary
> > > for meta services, as they are actually concepts defined in the runtime
> > > level of Flink Table / SQL.
> > >
> > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > `ChangelogMode` and `override` are mainly used for verification and
> > > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
> in
> > > our streaming & batch ETL, and display the `override` information on
> the
> > > UI.
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for starting this FLIP! Data lineage is a very important
> topic,
> > > > which has been missing for a long time in Flink. I have some
> questions
> > > > about the FLIP.
> > > >
> > > > About events and listeners:
> > > >
> > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > JobCreatedEvent.
> > > > This is an internal class in flink-runtime, and I think the correct
> API
> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> > > batch
> > > > and streaming becomes much more vague as Flink is evolving towards
> > > > batch-streaming unification, so I’m concerned about exposing JobType
> > as a
> > > > public API. Is there any specific use case to expose the batch /
> > > streaming
> > > > info to listeners or meta services?
> > > >
> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > > ambiguous. To be honest the configuration is quite a mess in Flink,
> so
> > > > maybe it’s better to be more specific here to tell users what
> > information
> > > > they could expect to see here, instead of just a “job configuration”
> as
> > > > described in JavaDoc.
> > > >
> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor.
> I
> > > think
> > > > more information should be provided here, such as which thread model
> > this
> > > > executor could promise, and whether the user should care about
> > > concurrency
> > > > issues. Otherwise I prefer not to give such an utility that no one
> > dares
> > > to
> > > > use safely, and leave it to users to choose their implementation.
> > > >
> > > > About lineage:
> > > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > > LineageEntity. Could you elaborate more on this class? From my naive
> > > > imagination, the lineage is shaped as a DAG, where vertices are
> sources
> > > and
> > > > sinks (LineageEntity) and edges are connections between them
> > > > (LineageRelation), so it is a bit confusing for a name mixing these
> two
> > > > concepts.
> > > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > > and Flink, which appears in the TableLineageEntity.
> > > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > > (the “override” is quite confusing). I’m wondering if these are
> > necessary
> > > > for meta services, as they are actually concepts defined in the
> runtime
> > > > level of Flink Table / SQL.
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > Is there any comment or feedback for this FLIP? Hope to hear from
> > you,
> > > > > thanks
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I would like to start a discussion on FLIP-314: Support
> Customized
> > > Job
> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
> Flink
> > > > > > streaming and batch jobs create lineage dependency between source
> > and
> > > > > sink,
> > > > > > users can manage their data and jobs according to this lineage
> > > > > information.
> > > > > > For example, when there is a delay in Flink ETL or data, users
> can
> > > > easily
> > > > > > trace the problematic jobs and affected data. On the other hand,
> > when
> > > > > users
> > > > > > need to correct data or debug, they can perform operations based
> on
> > > > > lineage
> > > > > > too.
> > > > > >
> > > > > > In FLIP-314 we want to introduce lineage related interfaces for
> > Flink
> > > > and
> > > > > > users can create customized job status listeners. When job status
> > > > > changes,
> > > > > > users can get job status and information to add, update or delete
> > > > > lineage.
> > > > > >
> > > > > > Looking forward to your feedback, thanks.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > [2]
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Feng Jin <ji...@gmail.com>.
Hi Shammon
Thank you for proposing this FLIP. I think the Flink Job lineage is a very
useful feature.
I have few question:

1. For DataStream Jobs, users need to set up lineage relationships when
building DAGs for their custom sources and sinks.
However, for some common connectors such as Kafka Connector and JDBC
Connector, we can add a lineage interface like `supportReportLineage`, so
that these connectors can implement it.
This way, in the scenario of DataStream Jobs, lineages can be automatically
reported. What do you think?


2. From the current design, it seems that we need to analyze column lineage
through pipeline. As far as I know, it is relatively easy to obtain column
lineage through Calcite MetaQuery API.
Would you consider using this approach? Or do we need to implement another
parsing process based on the pipeline?
```
RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery();
metadataQuery.getColumnOrigins(inputRel, i);
```
Best,
Feng


On Sun, Jun 25, 2023 at 8:06 PM Shammon FY <zj...@gmail.com> wrote:

> Hi yuxia and Yun,
>
> Thanks for your input.
>
> For yuxia:
> > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
>
> At present, we only need to notify the listener when a job goes to
> termination, but I think it makes sense to add generic `oldStatus` and
> `newStatus` in the listener and users can update the job state in their
> service as needed.
>
> > 2: I'm really confused about the `config()` included in `LineageEntity`,
> where is it from and what is it for ?
>
> The `config` in `LineageEntity` is used for users to get options for source
> and sink connectors. As the examples in the FLIP, users can add
> server/group/topic information in the config for kafka and create lineage
> entities for `DataStream` jobs, then the listeners can get this information
> to identify the same connector in different jobs. Otherwise, the `config`
> in `TableLineageEntity` will be the same as `getOptions` in
> `CatalogBaseTable`.
>
> > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` is
> needed or not, since `TableSinkLineageEntity` contains
> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> changelogmode?
>
> At present, we do not actually use the changelog mode. It can be deleted,
> and I have updated FLIP.
>
> > Btw, since there're a lot interfaces proposed, I think it'll be better to
> give an example about how to implement a listener in this FLIP to make us
> know better about the interfaces.
>
> I have added the example in the FLIP and the related interfaces and
> examples are in branch [1].
>
> For Yun:
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP does not touch them, and will them become part of the
> List<LineageEntity> sources() or adding another interface?
>
> You're right, currently lookup join dim tables were not considered in the
> 'proposed changed' section of this FLIP. But the interface for lineage is
> universal and we can give `TableLookupSourceLineageEntity` which implements
> `TableSourceLineageEntity` in the future without modifying the public
> interface.
>
> > By the way, if you want to focus on job lineage instead of data column
> lineage in this FLIP, why we must introduce so many column-lineage related
> interface here?
>
> The lineage information in SQL jobs includes table lineage and column
> lineage. Although SQL jobs currently do not support column lineage, we
> would like to support this in the next step. So we have comprehensively
> considered the table lineage and column lineage interfaces here, and
> defined these two interfaces together clearly
>
>
> [1]
>
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>
> Best,
> Shammon FY
>
>
> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
>
> > Hi Shammon,
> >
> > I like the idea in general and it will help to analysis the job lineages
> > no matter FlinkSQL or Flink jar jobs in production environments.
> >
> > For Qingsheng's concern, I'd like the name of JobType more than
> > RuntimeExecutionMode, as the latter one is not easy to understand for
> users.
> >
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP
> > does not touch them, and will them become part of the List<LineageEntity>
> > sources()​ or adding another interface?
> >
> > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Shammon FY <zj...@gmail.com>
> > Sent: Sunday, June 25, 2023 16:13
> > To: dev@flink.apache.org <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
> >
> > Hi Qingsheng,
> >
> > Thanks for your valuable feedback.
> >
> > > 1. Is there any specific use case to expose the batch / streaming info
> to
> > listeners or meta services?
> >
> > I agree with you that Flink is evolving towards batch-streaming
> > unification, but the lifecycle of them is different. If a job processes a
> > bound dataset, it will end after completing the data processing,
> otherwise,
> > it will run for a long time. In our scenario, we will regularly schedule
> > some Flink jobs to process bound dataset and update some job information
> to
> > the lineage information for the "batch" jobs such as scheduled timestamp,
> > execution duration when jobs are finished, which is different from
> > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > `existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
> > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
> do
> > you think of it?
> >
> > > 2. it’s better to be more specific here to tell users what information
> > they could expect to see here, instead of just a “job configuration” as
> > described in JavaDoc.
> >
> > Thanks and I have updated the doc in FLIP.
> >
> > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
> >
> > I have updated the docs for io executor  in
> > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
> and
> > executes submitted tasks in parallel. Users can submit tasks to the
> > executor which ensures that the submitted task can be executed before the
> > job exits.
> >
> > > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> > LineageEntity.
> >
> > In the initial idea, the `LineageRelationEntity` is used for `DataStream`
> > to set additional lineage information besides source. For example, there
> > are table and column lineages in SQL jobs. When we build a `DataStream`
> job
> > with table source and sink, we can add table lineage in the following
> > method.
> > ```
> > public class DataStreamSink {
> >     public DataStreamSink setLineageSources(LineageEntity ... sources);
> > }
> > ```
> > But we can not set column lineage for the above sink, and for the sake of
> > universality, we do not want to add a method similar to `addLineageColumn
> > (...)` in `DataStreamSink`. So I put this information into
> > LineageRelationEntity so that SQL and DataStream jobs can be consistent.
> > But as you mentioned, this approach does indeed lead to ambiguity and
> > complexity. So my current idea is to add the `setLineageRelation` method
> in
> > `DataStreamSink` directly without `LineageRelationEntity`, I have updated
> > the FLIP and please help to review it again, thanks.
> >
> > > 5. I can’t find the definition of CatalogContext in the current code
> base
> > and Flink, which appears in the TableLineageEntity.
> >
> > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> >
> > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> boolean
> > (the “override” is quite confusing). I’m wondering if these are necessary
> > for meta services, as they are actually concepts defined in the runtime
> > level of Flink Table / SQL.
> >
> > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > `ChangelogMode` and `override` are mainly used for verification and
> > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > `UPDATE`, we only want to report and update lineage for `INSERT` jobs in
> > our streaming & batch ETL, and display the `override` information on the
> > UI.
> >
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for starting this FLIP! Data lineage is a very important topic,
> > > which has been missing for a long time in Flink. I have some questions
> > > about the FLIP.
> > >
> > > About events and listeners:
> > >
> > > 1. I’m not sure if it is necessary to expose JobType to in
> > JobCreatedEvent.
> > > This is an internal class in flink-runtime, and I think the correct API
> > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> > batch
> > > and streaming becomes much more vague as Flink is evolving towards
> > > batch-streaming unification, so I’m concerned about exposing JobType
> as a
> > > public API. Is there any specific use case to expose the batch /
> > streaming
> > > info to listeners or meta services?
> > >
> > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > ambiguous. To be honest the configuration is quite a mess in Flink, so
> > > maybe it’s better to be more specific here to tell users what
> information
> > > they could expect to see here, instead of just a “job configuration” as
> > > described in JavaDoc.
> > >
> > > 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
> > think
> > > more information should be provided here, such as which thread model
> this
> > > executor could promise, and whether the user should care about
> > concurrency
> > > issues. Otherwise I prefer not to give such an utility that no one
> dares
> > to
> > > use safely, and leave it to users to choose their implementation.
> > >
> > > About lineage:
> > >
> > > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> > > LineageEntity. Could you elaborate more on this class? From my naive
> > > imagination, the lineage is shaped as a DAG, where vertices are sources
> > and
> > > sinks (LineageEntity) and edges are connections between them
> > > (LineageRelation), so it is a bit confusing for a name mixing these two
> > > concepts.
> > >
> > > 5. I can’t find the definition of CatalogContext in the current code
> base
> > > and Flink, which appears in the TableLineageEntity.
> > >
> > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> boolean
> > > (the “override” is quite confusing). I’m wondering if these are
> necessary
> > > for meta services, as they are actually concepts defined in the runtime
> > > level of Flink Table / SQL.
> > >
> > > Best,
> > > Qingsheng
> > >
> > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Is there any comment or feedback for this FLIP? Hope to hear from
> you,
> > > > thanks
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I would like to start a discussion on FLIP-314: Support Customized
> > Job
> > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > > > streaming and batch jobs create lineage dependency between source
> and
> > > > sink,
> > > > > users can manage their data and jobs according to this lineage
> > > > information.
> > > > > For example, when there is a delay in Flink ETL or data, users can
> > > easily
> > > > > trace the problematic jobs and affected data. On the other hand,
> when
> > > > users
> > > > > need to correct data or debug, they can perform operations based on
> > > > lineage
> > > > > too.
> > > > >
> > > > > In FLIP-314 we want to introduce lineage related interfaces for
> Flink
> > > and
> > > > > users can create customized job status listeners. When job status
> > > > changes,
> > > > > users can get job status and information to add, update or delete
> > > > lineage.
> > > > >
> > > > > Looking forward to your feedback, thanks.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > >
> > >
> >
>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Yong Fang <zj...@gmail.com>.
Hi Martijn,

If there're no more comments, I will start a vote for this, thanks

Best,
Fang Yong

On Tue, Feb 20, 2024 at 4:53 PM Yong Fang <zj...@gmail.com> wrote:

> Hi Martijn,
>
> Thank you for your attention. Let me first explain the specific situation
> of FLIP-314. FLIP-314 is currently in an accepted state, but actual code
> development has not yet begun, and interface related PR has not been merged
> into the master. So it may not be necessary for us to create a separate
> FLIP. Currently, my idea is to directly update the interface on FLIP-314,
> but to initiate a separate thread with the context and we can vote there.
>
> What do you think? Thanks
>
> Best,
> Fang Yong
>
> On Mon, Feb 19, 2024 at 8:27 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> I'm a bit confused: did we add new interfaces after FLIP-314 was
>> accepted? If so, please move the new interfaces to a new FLIP and
>> start a separate vote. We can't retrospectively change an accepted
>> FLIP with new interfaces and a new vote.
>>
>> On Mon, Feb 19, 2024 at 3:22 AM Yong Fang <zj...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > If there are no more feedbacks, I will start a vote for the new
>> interfaces
>> > in the next day, thanks
>> >
>> > Best,
>> > Fang Yong
>> >
>> > On Thu, Feb 8, 2024 at 1:30 PM Yong Fang <zj...@gmail.com> wrote:
>> >
>> > > Hi devs,
>> > >
>> > > According to the online-discussion in FLINK-3127 [1] and
>> > > offline-discussion with Maciej Obuchowski and Zhenqiu Huang, we would
>> like
>> > > to update the lineage vertex relevant interfaces in FLIP-314 [2] as
>> follows:
>> > >
>> > > 1. Introduce `LineageDataset` which represents source and sink in
>> > > `LineageVertex`. The fields in `LineageDataset` are as follows:
>> > >     /* Name for this particular dataset. */
>> > >     String name;
>> > >     /* Unique name for this dataset's storage, for example, url for
>> jdbc
>> > > connector and location for lakehouse connector. */
>> > >     String namespace;
>> > >     /* Facets for the lineage vertex to describe the particular
>> > > information of dataset, such as schema and config. */
>> > >     Map<String, Facet> facets;
>> > >
>> > > 2. There may be multiple datasets in one `LineageVertex`, for example,
>> > > kafka source or hybrid source. So users can get dataset list from
>> > > `LineageVertex`:
>> > >     /** Get datasets from the lineage vertex. */
>> > >     List<LineageDataset> datasets();
>> > >
>> > > 3. There will be built in facets for config and schema. To describe
>> > > columns in table/sql jobs and datastream jobs, we introduce
>> > > `DatasetSchemaField`.
>> > >     /** Builtin config facet for dataset. */
>> > >     @PublicEvolving
>> > >     public interface DatasetConfigFacet extends LineageDatasetFacet {
>> > >         Map<String, String> config();
>> > >     }
>> > >
>> > >     /** Field for schema in dataset. */
>> > >     public interface DatasetSchemaField<T> {
>> > >         /** The name of the field. */
>> > >         String name();
>> > >         /** The type of the field. */
>> > >         T type();
>> > >     }
>> > >
>> > > Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking
>> forward
>> > > to your feedback, thanks
>> > >
>> > > Best,
>> > > Fang Yong
>> > >
>> > > On Mon, Sep 25, 2023 at 1:18 PM Shammon FY <zj...@gmail.com> wrote:
>> > >
>> > >> Hi David,
>> > >>
>> > >> Do you want the detailed topology for Flink job? You can get
>> > >> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it
>> has
>> > >> `String jsonPlan`. You can parse the json plan to get all steps and
>> > >> relations between them in a Flink job. Hope this can help you,
>> thanks!
>> > >>
>> > >> Best,
>> > >> Shammon FY
>> > >>
>> > >> On Tue, Sep 19, 2023 at 11:46 PM David Radley <
>> david_radley@uk.ibm.com>
>> > >> wrote:
>> > >>
>> > >>> Hi there,
>> > >>> I am looking at the interfaces. If I am reading it correctly,there
>> is
>> > >>> one relationship between the source and sink and this relationship
>> > >>> represents the operational lineage. Lineage is usually represented
>> as asset
>> > >>> -> process - > asset – see for example
>> > >>>
>> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
>> > >>>
>> > >>> Maybe I am missing it, but it seems to be that it would be useful to
>> > >>> store the process in the lineage graph.
>> > >>>
>> > >>> It is useful to have the top level lineage as source -> Flink job ->
>> > >>> sink. Where the Flink job is the process, but also to have this
>> asset ->
>> > >>> process -> asset pattern for each of the steps in the job. If this
>> is
>> > >>> present, please could you point me to it,
>> > >>>
>> > >>>       Kind regards, David.
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>> From: David Radley <da...@uk.ibm.com>
>> > >>> Date: Tuesday, 19 September 2023 at 16:11
>> > >>> To: dev@flink.apache.org <de...@flink.apache.org>
>> > >>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
>> > >>> Lineage Listener
>> > >>> Hi,
>> > >>> I notice that there is an experimental lineage integration for Flink
>> > >>> with OpenLineage https://openlineage.io/docs/integrations/flink  .
>> I
>> > >>> think this feature would allow for a superior Flink OpenLineage
>> integration,
>> > >>>         Kind regards, David.
>> > >>>
>> > >>> From: XTransfer <ji...@xtransfer.cn.INVALID>
>> > >>> Date: Tuesday, 19 September 2023 at 15:47
>> > >>> To: dev@flink.apache.org <de...@flink.apache.org>
>> > >>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job
>> > >>> Lineage Listener
>> > >>> Thanks Shammon for this proposal.
>> > >>>
>> > >>> That’s helpful for collecting the lineage of Flink tasks.
>> > >>> Looking forward to its implementation.
>> > >>>
>> > >>> Best,
>> > >>> Jiabao
>> > >>>
>> > >>>
>> > >>> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
>> > >>> >
>> > >>> > Thanks Shammon for the informations, the comment makes the
>> lifecycle
>> > >>> clearer.
>> > >>> > +1
>> > >>> >
>> > >>> >
>> > >>> > Best,
>> > >>> > Leonard
>> > >>> >
>> > >>> >
>> > >>> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com>
>> wrote:
>> > >>> >>
>> > >>> >> Hi devs,
>> > >>> >>
>> > >>> >> After discussing with @Qingsheng, I fixed a minor issue of the
>> > >>> lineage lifecycle in `StreamExecutionEnvironment`. I have added the
>> comment
>> > >>> to explain that the lineage information in
>> `StreamExecutionEnvironment`
>> > >>> will be consistent with that of transformations. When users clear
>> the
>> > >>> existing transformations, the added lineage information will also be
>> > >>> deleted.
>> > >>> >>
>> > >>> >> Please help to review it again, and If there are no more concerns
>> > >>> about FLIP-314[1], I would like to start voting later, thanks. cc @
>> > >>> <>Leonard
>> > >>> >>
>> > >>> >> Best,
>> > >>> >> Shammon FY
>> > >>> >>
>> > >>> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com
>> > >>> <ma...@gmail.com>> wrote:
>> > >>> >> Hi devs,
>> > >>> >>
>> > >>> >> Thanks for all the valuable feedback. If there are no more
>> concerns
>> > >>> about FLIP-314[1], I would like to start voting later, thanks.
>> > >>> >>
>> > >>> >>
>> > >>> >> [1]
>> > >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>> > >>>  <
>> > >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>> > >>> >
>> > >>> >>
>> > >>> >> Best,
>> > >>> >> Shammon FY
>> > >>> >>
>> > >>> >>
>> > >>> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com
>> > >>> <ma...@gmail.com>> wrote:
>> > >>> >> Thanks for the valuable feedback, Leonard.
>> > >>> >>
>> > >>> >> I have discussed with Leonard off-line. We have reached some
>> > >>> conclusions about these issues and I have updated the FLIP as
>> follows:
>> > >>> >>
>> > >>> >> 1. Simplify the `LineageEdge` interface by creating an edge from
>> one
>> > >>> source vertex to sink vertex.
>> > >>> >> 2. Remove the `TableColumnSourceLineageVertex` interface and
>> update
>> > >>> `TableColumnLineageEdge` to create an edge from columns in one
>> source to
>> > >>> each sink column.
>> > >>> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> > >>> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
>> > >>> `StreamExecutionEnviroment` for datastream job and remove previous
>> methods
>> > >>> in `DataStreamSource` and `DataStreamSink`.
>> > >>> >>
>> > >>> >> Looking forward to your feedback, thanks.
>> > >>> >>
>> > >>> >> Best,
>> > >>> >> Shammon FY
>> > >>> >
>> > >>>
>> > >>> Unless otherwise stated above:
>> > >>>
>> > >>> IBM United Kingdom Limited
>> > >>> Registered in England and Wales with number 741598
>> > >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > >>>
>> > >>> Unless otherwise stated above:
>> > >>>
>> > >>> IBM United Kingdom Limited
>> > >>> Registered in England and Wales with number 741598
>> > >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>> > >>>
>> > >>
>>
>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Yong Fang <zj...@gmail.com>.
Hi Martijn,

Thank you for your attention. Let me first explain the specific situation
of FLIP-314. FLIP-314 is currently in an accepted state, but actual code
development has not yet begun, and interface related PR has not been merged
into the master. So it may not be necessary for us to create a separate
FLIP. Currently, my idea is to directly update the interface on FLIP-314,
but to initiate a separate thread with the context and we can vote there.

What do you think? Thanks

Best,
Fang Yong

On Mon, Feb 19, 2024 at 8:27 PM Martijn Visser <ma...@apache.org>
wrote:

> I'm a bit confused: did we add new interfaces after FLIP-314 was
> accepted? If so, please move the new interfaces to a new FLIP and
> start a separate vote. We can't retrospectively change an accepted
> FLIP with new interfaces and a new vote.
>
> On Mon, Feb 19, 2024 at 3:22 AM Yong Fang <zj...@gmail.com> wrote:
> >
> > Hi all,
> >
> > If there are no more feedbacks, I will start a vote for the new
> interfaces
> > in the next day, thanks
> >
> > Best,
> > Fang Yong
> >
> > On Thu, Feb 8, 2024 at 1:30 PM Yong Fang <zj...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > According to the online-discussion in FLINK-3127 [1] and
> > > offline-discussion with Maciej Obuchowski and Zhenqiu Huang, we would
> like
> > > to update the lineage vertex relevant interfaces in FLIP-314 [2] as
> follows:
> > >
> > > 1. Introduce `LineageDataset` which represents source and sink in
> > > `LineageVertex`. The fields in `LineageDataset` are as follows:
> > >     /* Name for this particular dataset. */
> > >     String name;
> > >     /* Unique name for this dataset's storage, for example, url for
> jdbc
> > > connector and location for lakehouse connector. */
> > >     String namespace;
> > >     /* Facets for the lineage vertex to describe the particular
> > > information of dataset, such as schema and config. */
> > >     Map<String, Facet> facets;
> > >
> > > 2. There may be multiple datasets in one `LineageVertex`, for example,
> > > kafka source or hybrid source. So users can get dataset list from
> > > `LineageVertex`:
> > >     /** Get datasets from the lineage vertex. */
> > >     List<LineageDataset> datasets();
> > >
> > > 3. There will be built in facets for config and schema. To describe
> > > columns in table/sql jobs and datastream jobs, we introduce
> > > `DatasetSchemaField`.
> > >     /** Builtin config facet for dataset. */
> > >     @PublicEvolving
> > >     public interface DatasetConfigFacet extends LineageDatasetFacet {
> > >         Map<String, String> config();
> > >     }
> > >
> > >     /** Field for schema in dataset. */
> > >     public interface DatasetSchemaField<T> {
> > >         /** The name of the field. */
> > >         String name();
> > >         /** The type of the field. */
> > >         T type();
> > >     }
> > >
> > > Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking
> forward
> > > to your feedback, thanks
> > >
> > > Best,
> > > Fang Yong
> > >
> > > On Mon, Sep 25, 2023 at 1:18 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > >> Hi David,
> > >>
> > >> Do you want the detailed topology for Flink job? You can get
> > >> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it
> has
> > >> `String jsonPlan`. You can parse the json plan to get all steps and
> > >> relations between them in a Flink job. Hope this can help you, thanks!
> > >>
> > >> Best,
> > >> Shammon FY
> > >>
> > >> On Tue, Sep 19, 2023 at 11:46 PM David Radley <
> david_radley@uk.ibm.com>
> > >> wrote:
> > >>
> > >>> Hi there,
> > >>> I am looking at the interfaces. If I am reading it correctly,there is
> > >>> one relationship between the source and sink and this relationship
> > >>> represents the operational lineage. Lineage is usually represented
> as asset
> > >>> -> process - > asset – see for example
> > >>>
> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
> > >>>
> > >>> Maybe I am missing it, but it seems to be that it would be useful to
> > >>> store the process in the lineage graph.
> > >>>
> > >>> It is useful to have the top level lineage as source -> Flink job ->
> > >>> sink. Where the Flink job is the process, but also to have this
> asset ->
> > >>> process -> asset pattern for each of the steps in the job. If this is
> > >>> present, please could you point me to it,
> > >>>
> > >>>       Kind regards, David.
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> From: David Radley <da...@uk.ibm.com>
> > >>> Date: Tuesday, 19 September 2023 at 16:11
> > >>> To: dev@flink.apache.org <de...@flink.apache.org>
> > >>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
> > >>> Lineage Listener
> > >>> Hi,
> > >>> I notice that there is an experimental lineage integration for Flink
> > >>> with OpenLineage https://openlineage.io/docs/integrations/flink  . I
> > >>> think this feature would allow for a superior Flink OpenLineage
> integration,
> > >>>         Kind regards, David.
> > >>>
> > >>> From: XTransfer <ji...@xtransfer.cn.INVALID>
> > >>> Date: Tuesday, 19 September 2023 at 15:47
> > >>> To: dev@flink.apache.org <de...@flink.apache.org>
> > >>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job
> > >>> Lineage Listener
> > >>> Thanks Shammon for this proposal.
> > >>>
> > >>> That’s helpful for collecting the lineage of Flink tasks.
> > >>> Looking forward to its implementation.
> > >>>
> > >>> Best,
> > >>> Jiabao
> > >>>
> > >>>
> > >>> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
> > >>> >
> > >>> > Thanks Shammon for the informations, the comment makes the
> lifecycle
> > >>> clearer.
> > >>> > +1
> > >>> >
> > >>> >
> > >>> > Best,
> > >>> > Leonard
> > >>> >
> > >>> >
> > >>> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com>
> wrote:
> > >>> >>
> > >>> >> Hi devs,
> > >>> >>
> > >>> >> After discussing with @Qingsheng, I fixed a minor issue of the
> > >>> lineage lifecycle in `StreamExecutionEnvironment`. I have added the
> comment
> > >>> to explain that the lineage information in
> `StreamExecutionEnvironment`
> > >>> will be consistent with that of transformations. When users clear the
> > >>> existing transformations, the added lineage information will also be
> > >>> deleted.
> > >>> >>
> > >>> >> Please help to review it again, and If there are no more concerns
> > >>> about FLIP-314[1], I would like to start voting later, thanks. cc @
> > >>> <>Leonard
> > >>> >>
> > >>> >> Best,
> > >>> >> Shammon FY
> > >>> >>
> > >>> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com
> > >>> <ma...@gmail.com>> wrote:
> > >>> >> Hi devs,
> > >>> >>
> > >>> >> Thanks for all the valuable feedback. If there are no more
> concerns
> > >>> about FLIP-314[1], I would like to start voting later, thanks.
> > >>> >>
> > >>> >>
> > >>> >> [1]
> > >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > >>>  <
> > >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > >>> >
> > >>> >>
> > >>> >> Best,
> > >>> >> Shammon FY
> > >>> >>
> > >>> >>
> > >>> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com
> > >>> <ma...@gmail.com>> wrote:
> > >>> >> Thanks for the valuable feedback, Leonard.
> > >>> >>
> > >>> >> I have discussed with Leonard off-line. We have reached some
> > >>> conclusions about these issues and I have updated the FLIP as
> follows:
> > >>> >>
> > >>> >> 1. Simplify the `LineageEdge` interface by creating an edge from
> one
> > >>> source vertex to sink vertex.
> > >>> >> 2. Remove the `TableColumnSourceLineageVertex` interface and
> update
> > >>> `TableColumnLineageEdge` to create an edge from columns in one
> source to
> > >>> each sink column.
> > >>> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
> > >>> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
> > >>> `StreamExecutionEnviroment` for datastream job and remove previous
> methods
> > >>> in `DataStreamSource` and `DataStreamSink`.
> > >>> >>
> > >>> >> Looking forward to your feedback, thanks.
> > >>> >>
> > >>> >> Best,
> > >>> >> Shammon FY
> > >>> >
> > >>>
> > >>> Unless otherwise stated above:
> > >>>
> > >>> IBM United Kingdom Limited
> > >>> Registered in England and Wales with number 741598
> > >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
> > >>>
> > >>> Unless otherwise stated above:
> > >>>
> > >>> IBM United Kingdom Limited
> > >>> Registered in England and Wales with number 741598
> > >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
> > >>>
> > >>
>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Martijn Visser <ma...@apache.org>.
I'm a bit confused: did we add new interfaces after FLIP-314 was
accepted? If so, please move the new interfaces to a new FLIP and
start a separate vote. We can't retrospectively change an accepted
FLIP with new interfaces and a new vote.

On Mon, Feb 19, 2024 at 3:22 AM Yong Fang <zj...@gmail.com> wrote:
>
> Hi all,
>
> If there are no more feedbacks, I will start a vote for the new interfaces
> in the next day, thanks
>
> Best,
> Fang Yong
>
> On Thu, Feb 8, 2024 at 1:30 PM Yong Fang <zj...@gmail.com> wrote:
>
> > Hi devs,
> >
> > According to the online-discussion in FLINK-3127 [1] and
> > offline-discussion with Maciej Obuchowski and Zhenqiu Huang, we would like
> > to update the lineage vertex relevant interfaces in FLIP-314 [2] as follows:
> >
> > 1. Introduce `LineageDataset` which represents source and sink in
> > `LineageVertex`. The fields in `LineageDataset` are as follows:
> >     /* Name for this particular dataset. */
> >     String name;
> >     /* Unique name for this dataset's storage, for example, url for jdbc
> > connector and location for lakehouse connector. */
> >     String namespace;
> >     /* Facets for the lineage vertex to describe the particular
> > information of dataset, such as schema and config. */
> >     Map<String, Facet> facets;
> >
> > 2. There may be multiple datasets in one `LineageVertex`, for example,
> > kafka source or hybrid source. So users can get dataset list from
> > `LineageVertex`:
> >     /** Get datasets from the lineage vertex. */
> >     List<LineageDataset> datasets();
> >
> > 3. There will be built in facets for config and schema. To describe
> > columns in table/sql jobs and datastream jobs, we introduce
> > `DatasetSchemaField`.
> >     /** Builtin config facet for dataset. */
> >     @PublicEvolving
> >     public interface DatasetConfigFacet extends LineageDatasetFacet {
> >         Map<String, String> config();
> >     }
> >
> >     /** Field for schema in dataset. */
> >     public interface DatasetSchemaField<T> {
> >         /** The name of the field. */
> >         String name();
> >         /** The type of the field. */
> >         T type();
> >     }
> >
> > Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking forward
> > to your feedback, thanks
> >
> > Best,
> > Fang Yong
> >
> > On Mon, Sep 25, 2023 at 1:18 PM Shammon FY <zj...@gmail.com> wrote:
> >
> >> Hi David,
> >>
> >> Do you want the detailed topology for Flink job? You can get
> >> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it has
> >> `String jsonPlan`. You can parse the json plan to get all steps and
> >> relations between them in a Flink job. Hope this can help you, thanks!
> >>
> >> Best,
> >> Shammon FY
> >>
> >> On Tue, Sep 19, 2023 at 11:46 PM David Radley <da...@uk.ibm.com>
> >> wrote:
> >>
> >>> Hi there,
> >>> I am looking at the interfaces. If I am reading it correctly,there is
> >>> one relationship between the source and sink and this relationship
> >>> represents the operational lineage. Lineage is usually represented as asset
> >>> -> process - > asset – see for example
> >>> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
> >>>
> >>> Maybe I am missing it, but it seems to be that it would be useful to
> >>> store the process in the lineage graph.
> >>>
> >>> It is useful to have the top level lineage as source -> Flink job ->
> >>> sink. Where the Flink job is the process, but also to have this asset ->
> >>> process -> asset pattern for each of the steps in the job. If this is
> >>> present, please could you point me to it,
> >>>
> >>>       Kind regards, David.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> From: David Radley <da...@uk.ibm.com>
> >>> Date: Tuesday, 19 September 2023 at 16:11
> >>> To: dev@flink.apache.org <de...@flink.apache.org>
> >>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
> >>> Lineage Listener
> >>> Hi,
> >>> I notice that there is an experimental lineage integration for Flink
> >>> with OpenLineage https://openlineage.io/docs/integrations/flink  . I
> >>> think this feature would allow for a superior Flink OpenLineage integration,
> >>>         Kind regards, David.
> >>>
> >>> From: XTransfer <ji...@xtransfer.cn.INVALID>
> >>> Date: Tuesday, 19 September 2023 at 15:47
> >>> To: dev@flink.apache.org <de...@flink.apache.org>
> >>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job
> >>> Lineage Listener
> >>> Thanks Shammon for this proposal.
> >>>
> >>> That’s helpful for collecting the lineage of Flink tasks.
> >>> Looking forward to its implementation.
> >>>
> >>> Best,
> >>> Jiabao
> >>>
> >>>
> >>> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
> >>> >
> >>> > Thanks Shammon for the informations, the comment makes the lifecycle
> >>> clearer.
> >>> > +1
> >>> >
> >>> >
> >>> > Best,
> >>> > Leonard
> >>> >
> >>> >
> >>> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
> >>> >>
> >>> >> Hi devs,
> >>> >>
> >>> >> After discussing with @Qingsheng, I fixed a minor issue of the
> >>> lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment
> >>> to explain that the lineage information in `StreamExecutionEnvironment`
> >>> will be consistent with that of transformations. When users clear the
> >>> existing transformations, the added lineage information will also be
> >>> deleted.
> >>> >>
> >>> >> Please help to review it again, and If there are no more concerns
> >>> about FLIP-314[1], I would like to start voting later, thanks. cc @
> >>> <>Leonard
> >>> >>
> >>> >> Best,
> >>> >> Shammon FY
> >>> >>
> >>> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com
> >>> <ma...@gmail.com>> wrote:
> >>> >> Hi devs,
> >>> >>
> >>> >> Thanks for all the valuable feedback. If there are no more concerns
> >>> about FLIP-314[1], I would like to start voting later, thanks.
> >>> >>
> >>> >>
> >>> >> [1]
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> >>>  <
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> >>> >
> >>> >>
> >>> >> Best,
> >>> >> Shammon FY
> >>> >>
> >>> >>
> >>> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com
> >>> <ma...@gmail.com>> wrote:
> >>> >> Thanks for the valuable feedback, Leonard.
> >>> >>
> >>> >> I have discussed with Leonard off-line. We have reached some
> >>> conclusions about these issues and I have updated the FLIP as follows:
> >>> >>
> >>> >> 1. Simplify the `LineageEdge` interface by creating an edge from one
> >>> source vertex to sink vertex.
> >>> >> 2. Remove the `TableColumnSourceLineageVertex` interface and update
> >>> `TableColumnLineageEdge` to create an edge from columns in one source to
> >>> each sink column.
> >>> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
> >>> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
> >>> `StreamExecutionEnviroment` for datastream job and remove previous methods
> >>> in `DataStreamSource` and `DataStreamSink`.
> >>> >>
> >>> >> Looking forward to your feedback, thanks.
> >>> >>
> >>> >> Best,
> >>> >> Shammon FY
> >>> >
> >>>
> >>> Unless otherwise stated above:
> >>>
> >>> IBM United Kingdom Limited
> >>> Registered in England and Wales with number 741598
> >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >>>
> >>> Unless otherwise stated above:
> >>>
> >>> IBM United Kingdom Limited
> >>> Registered in England and Wales with number 741598
> >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >>>
> >>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Yong Fang <zj...@gmail.com>.
Hi all,

If there are no more feedbacks, I will start a vote for the new interfaces
in the next day, thanks

Best,
Fang Yong

On Thu, Feb 8, 2024 at 1:30 PM Yong Fang <zj...@gmail.com> wrote:

> Hi devs,
>
> According to the online-discussion in FLINK-3127 [1] and
> offline-discussion with Maciej Obuchowski and Zhenqiu Huang, we would like
> to update the lineage vertex relevant interfaces in FLIP-314 [2] as follows:
>
> 1. Introduce `LineageDataset` which represents source and sink in
> `LineageVertex`. The fields in `LineageDataset` are as follows:
>     /* Name for this particular dataset. */
>     String name;
>     /* Unique name for this dataset's storage, for example, url for jdbc
> connector and location for lakehouse connector. */
>     String namespace;
>     /* Facets for the lineage vertex to describe the particular
> information of dataset, such as schema and config. */
>     Map<String, Facet> facets;
>
> 2. There may be multiple datasets in one `LineageVertex`, for example,
> kafka source or hybrid source. So users can get dataset list from
> `LineageVertex`:
>     /** Get datasets from the lineage vertex. */
>     List<LineageDataset> datasets();
>
> 3. There will be built in facets for config and schema. To describe
> columns in table/sql jobs and datastream jobs, we introduce
> `DatasetSchemaField`.
>     /** Builtin config facet for dataset. */
>     @PublicEvolving
>     public interface DatasetConfigFacet extends LineageDatasetFacet {
>         Map<String, String> config();
>     }
>
>     /** Field for schema in dataset. */
>     public interface DatasetSchemaField<T> {
>         /** The name of the field. */
>         String name();
>         /** The type of the field. */
>         T type();
>     }
>
> Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking forward
> to your feedback, thanks
>
> Best,
> Fang Yong
>
> On Mon, Sep 25, 2023 at 1:18 PM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi David,
>>
>> Do you want the detailed topology for Flink job? You can get
>> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it has
>> `String jsonPlan`. You can parse the json plan to get all steps and
>> relations between them in a Flink job. Hope this can help you, thanks!
>>
>> Best,
>> Shammon FY
>>
>> On Tue, Sep 19, 2023 at 11:46 PM David Radley <da...@uk.ibm.com>
>> wrote:
>>
>>> Hi there,
>>> I am looking at the interfaces. If I am reading it correctly,there is
>>> one relationship between the source and sink and this relationship
>>> represents the operational lineage. Lineage is usually represented as asset
>>> -> process - > asset – see for example
>>> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
>>>
>>> Maybe I am missing it, but it seems to be that it would be useful to
>>> store the process in the lineage graph.
>>>
>>> It is useful to have the top level lineage as source -> Flink job ->
>>> sink. Where the Flink job is the process, but also to have this asset ->
>>> process -> asset pattern for each of the steps in the job. If this is
>>> present, please could you point me to it,
>>>
>>>       Kind regards, David.
>>>
>>>
>>>
>>>
>>>
>>> From: David Radley <da...@uk.ibm.com>
>>> Date: Tuesday, 19 September 2023 at 16:11
>>> To: dev@flink.apache.org <de...@flink.apache.org>
>>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
>>> Lineage Listener
>>> Hi,
>>> I notice that there is an experimental lineage integration for Flink
>>> with OpenLineage https://openlineage.io/docs/integrations/flink  . I
>>> think this feature would allow for a superior Flink OpenLineage integration,
>>>         Kind regards, David.
>>>
>>> From: XTransfer <ji...@xtransfer.cn.INVALID>
>>> Date: Tuesday, 19 September 2023 at 15:47
>>> To: dev@flink.apache.org <de...@flink.apache.org>
>>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job
>>> Lineage Listener
>>> Thanks Shammon for this proposal.
>>>
>>> That’s helpful for collecting the lineage of Flink tasks.
>>> Looking forward to its implementation.
>>>
>>> Best,
>>> Jiabao
>>>
>>>
>>> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
>>> >
>>> > Thanks Shammon for the informations, the comment makes the lifecycle
>>> clearer.
>>> > +1
>>> >
>>> >
>>> > Best,
>>> > Leonard
>>> >
>>> >
>>> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
>>> >>
>>> >> Hi devs,
>>> >>
>>> >> After discussing with @Qingsheng, I fixed a minor issue of the
>>> lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment
>>> to explain that the lineage information in `StreamExecutionEnvironment`
>>> will be consistent with that of transformations. When users clear the
>>> existing transformations, the added lineage information will also be
>>> deleted.
>>> >>
>>> >> Please help to review it again, and If there are no more concerns
>>> about FLIP-314[1], I would like to start voting later, thanks. cc @
>>> <>Leonard
>>> >>
>>> >> Best,
>>> >> Shammon FY
>>> >>
>>> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com
>>> <ma...@gmail.com>> wrote:
>>> >> Hi devs,
>>> >>
>>> >> Thanks for all the valuable feedback. If there are no more concerns
>>> about FLIP-314[1], I would like to start voting later, thanks.
>>> >>
>>> >>
>>> >> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>>>  <
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>>> >
>>> >>
>>> >> Best,
>>> >> Shammon FY
>>> >>
>>> >>
>>> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com
>>> <ma...@gmail.com>> wrote:
>>> >> Thanks for the valuable feedback, Leonard.
>>> >>
>>> >> I have discussed with Leonard off-line. We have reached some
>>> conclusions about these issues and I have updated the FLIP as follows:
>>> >>
>>> >> 1. Simplify the `LineageEdge` interface by creating an edge from one
>>> source vertex to sink vertex.
>>> >> 2. Remove the `TableColumnSourceLineageVertex` interface and update
>>> `TableColumnLineageEdge` to create an edge from columns in one source to
>>> each sink column.
>>> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>>> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
>>> `StreamExecutionEnviroment` for datastream job and remove previous methods
>>> in `DataStreamSource` and `DataStreamSink`.
>>> >>
>>> >> Looking forward to your feedback, thanks.
>>> >>
>>> >> Best,
>>> >> Shammon FY
>>> >
>>>
>>> Unless otherwise stated above:
>>>
>>> IBM United Kingdom Limited
>>> Registered in England and Wales with number 741598
>>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>>
>>> Unless otherwise stated above:
>>>
>>> IBM United Kingdom Limited
>>> Registered in England and Wales with number 741598
>>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>>
>>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Yong Fang <zj...@gmail.com>.
Hi devs,

According to the online-discussion in FLINK-3127 [1] and offline-discussion
with Maciej Obuchowski and Zhenqiu Huang, we would like to update the
lineage vertex relevant interfaces in FLIP-314 [2] as follows:

1. Introduce `LineageDataset` which represents source and sink in
`LineageVertex`. The fields in `LineageDataset` are as follows:
    /* Name for this particular dataset. */
    String name;
    /* Unique name for this dataset's storage, for example, url for jdbc
connector and location for lakehouse connector. */
    String namespace;
    /* Facets for the lineage vertex to describe the particular information
of dataset, such as schema and config. */
    Map<String, Facet> facets;

2. There may be multiple datasets in one `LineageVertex`, for example,
kafka source or hybrid source. So users can get dataset list from
`LineageVertex`:
    /** Get datasets from the lineage vertex. */
    List<LineageDataset> datasets();

3. There will be built in facets for config and schema. To describe columns
in table/sql jobs and datastream jobs, we introduce `DatasetSchemaField`.
    /** Builtin config facet for dataset. */
    @PublicEvolving
    public interface DatasetConfigFacet extends LineageDatasetFacet {
        Map<String, String> config();
    }

    /** Field for schema in dataset. */
    public interface DatasetSchemaField<T> {
        /** The name of the field. */
        String name();
        /** The type of the field. */
        T type();
    }

Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking forward
to your feedback, thanks

Best,
Fang Yong

On Mon, Sep 25, 2023 at 1:18 PM Shammon FY <zj...@gmail.com> wrote:

> Hi David,
>
> Do you want the detailed topology for Flink job? You can get
> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it has
> `String jsonPlan`. You can parse the json plan to get all steps and
> relations between them in a Flink job. Hope this can help you, thanks!
>
> Best,
> Shammon FY
>
> On Tue, Sep 19, 2023 at 11:46 PM David Radley <da...@uk.ibm.com>
> wrote:
>
>> Hi there,
>> I am looking at the interfaces. If I am reading it correctly,there is one
>> relationship between the source and sink and this relationship represents
>> the operational lineage. Lineage is usually represented as asset -> process
>> - > asset – see for example
>> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
>>
>> Maybe I am missing it, but it seems to be that it would be useful to
>> store the process in the lineage graph.
>>
>> It is useful to have the top level lineage as source -> Flink job ->
>> sink. Where the Flink job is the process, but also to have this asset ->
>> process -> asset pattern for each of the steps in the job. If this is
>> present, please could you point me to it,
>>
>>       Kind regards, David.
>>
>>
>>
>>
>>
>> From: David Radley <da...@uk.ibm.com>
>> Date: Tuesday, 19 September 2023 at 16:11
>> To: dev@flink.apache.org <de...@flink.apache.org>
>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
>> Lineage Listener
>> Hi,
>> I notice that there is an experimental lineage integration for Flink with
>> OpenLineage https://openlineage.io/docs/integrations/flink  . I think
>> this feature would allow for a superior Flink OpenLineage integration,
>>         Kind regards, David.
>>
>> From: XTransfer <ji...@xtransfer.cn.INVALID>
>> Date: Tuesday, 19 September 2023 at 15:47
>> To: dev@flink.apache.org <de...@flink.apache.org>
>> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job
>> Lineage Listener
>> Thanks Shammon for this proposal.
>>
>> That’s helpful for collecting the lineage of Flink tasks.
>> Looking forward to its implementation.
>>
>> Best,
>> Jiabao
>>
>>
>> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
>> >
>> > Thanks Shammon for the informations, the comment makes the lifecycle
>> clearer.
>> > +1
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> >
>> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
>> >>
>> >> Hi devs,
>> >>
>> >> After discussing with @Qingsheng, I fixed a minor issue of the lineage
>> lifecycle in `StreamExecutionEnvironment`. I have added the comment to
>> explain that the lineage information in `StreamExecutionEnvironment` will
>> be consistent with that of transformations. When users clear the existing
>> transformations, the added lineage information will also be deleted.
>> >>
>> >> Please help to review it again, and If there are no more concerns
>> about FLIP-314[1], I would like to start voting later, thanks. cc @
>> <>Leonard
>> >>
>> >> Best,
>> >> Shammon FY
>> >>
>> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <mailto:
>> zjureel@gmail.com>> wrote:
>> >> Hi devs,
>> >>
>> >> Thanks for all the valuable feedback. If there are no more concerns
>> about FLIP-314[1], I would like to start voting later, thanks.
>> >>
>> >>
>> >> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>>  <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>> >
>> >>
>> >> Best,
>> >> Shammon FY
>> >>
>> >>
>> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com
>> <ma...@gmail.com>> wrote:
>> >> Thanks for the valuable feedback, Leonard.
>> >>
>> >> I have discussed with Leonard off-line. We have reached some
>> conclusions about these issues and I have updated the FLIP as follows:
>> >>
>> >> 1. Simplify the `LineageEdge` interface by creating an edge from one
>> source vertex to sink vertex.
>> >> 2. Remove the `TableColumnSourceLineageVertex` interface and update
>> `TableColumnLineageEdge` to create an edge from columns in one source to
>> each sink column.
>> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
>> `StreamExecutionEnviroment` for datastream job and remove previous methods
>> in `DataStreamSource` and `DataStreamSink`.
>> >>
>> >> Looking forward to your feedback, thanks.
>> >>
>> >> Best,
>> >> Shammon FY
>> >
>>
>> Unless otherwise stated above:
>>
>> IBM United Kingdom Limited
>> Registered in England and Wales with number 741598
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>
>> Unless otherwise stated above:
>>
>> IBM United Kingdom Limited
>> Registered in England and Wales with number 741598
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>
>

Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi David,

Do you want the detailed topology for Flink job? You can get
`JobDetailsInfo` in `RestCusterClient` with the submitted job id, it has
`String jsonPlan`. You can parse the json plan to get all steps and
relations between them in a Flink job. Hope this can help you, thanks!

Best,
Shammon FY

On Tue, Sep 19, 2023 at 11:46 PM David Radley <da...@uk.ibm.com>
wrote:

> Hi there,
> I am looking at the interfaces. If I am reading it correctly,there is one
> relationship between the source and sink and this relationship represents
> the operational lineage. Lineage is usually represented as asset -> process
> - > asset – see for example
> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
>
> Maybe I am missing it, but it seems to be that it would be useful to store
> the process in the lineage graph.
>
> It is useful to have the top level lineage as source -> Flink job -> sink.
> Where the Flink job is the process, but also to have this asset -> process
> -> asset pattern for each of the steps in the job. If this is present,
> please could you point me to it,
>
>       Kind regards, David.
>
>
>
>
>
> From: David Radley <da...@uk.ibm.com>
> Date: Tuesday, 19 September 2023 at 16:11
> To: dev@flink.apache.org <de...@flink.apache.org>
> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job Lineage
> Listener
> Hi,
> I notice that there is an experimental lineage integration for Flink with
> OpenLineage https://openlineage.io/docs/integrations/flink  . I think
> this feature would allow for a superior Flink OpenLineage integration,
>         Kind regards, David.
>
> From: XTransfer <ji...@xtransfer.cn.INVALID>
> Date: Tuesday, 19 September 2023 at 15:47
> To: dev@flink.apache.org <de...@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> Listener
> Thanks Shammon for this proposal.
>
> That’s helpful for collecting the lineage of Flink tasks.
> Looking forward to its implementation.
>
> Best,
> Jiabao
>
>
> > 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
> >
> > Thanks Shammon for the informations, the comment makes the lifecycle
> clearer.
> > +1
> >
> >
> > Best,
> > Leonard
> >
> >
> >> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
> >>
> >> Hi devs,
> >>
> >> After discussing with @Qingsheng, I fixed a minor issue of the lineage
> lifecycle in `StreamExecutionEnvironment`. I have added the comment to
> explain that the lineage information in `StreamExecutionEnvironment` will
> be consistent with that of transformations. When users clear the existing
> transformations, the added lineage information will also be deleted.
> >>
> >> Please help to review it again, and If there are no more concerns about
> FLIP-314[1], I would like to start voting later, thanks. cc @ <>Leonard
> >>
> >> Best,
> >> Shammon FY
> >>
> >> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <mailto:
> zjureel@gmail.com>> wrote:
> >> Hi devs,
> >>
> >> Thanks for all the valuable feedback. If there are no more concerns
> about FLIP-314[1], I would like to start voting later, thanks.
> >>
> >>
> >> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>  <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> >
> >>
> >> Best,
> >> Shammon FY
> >>
> >>
> >> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com <mailto:
> zjureel@gmail.com>> wrote:
> >> Thanks for the valuable feedback, Leonard.
> >>
> >> I have discussed with Leonard off-line. We have reached some
> conclusions about these issues and I have updated the FLIP as follows:
> >>
> >> 1. Simplify the `LineageEdge` interface by creating an edge from one
> source vertex to sink vertex.
> >> 2. Remove the `TableColumnSourceLineageVertex` interface and update
> `TableColumnLineageEdge` to create an edge from columns in one source to
> each sink column.
> >> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
> >> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
> `StreamExecutionEnviroment` for datastream job and remove previous methods
> in `DataStreamSource` and `DataStreamSink`.
> >>
> >> Looking forward to your feedback, thanks.
> >>
> >> Best,
> >> Shammon FY
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by David Radley <da...@uk.ibm.com>.
Hi there,
I am looking at the interfaces. If I am reading it correctly,there is one relationship between the source and sink and this relationship represents the operational lineage. Lineage is usually represented as asset -> process - > asset – see for example https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph

Maybe I am missing it, but it seems to be that it would be useful to store the process in the lineage graph.

It is useful to have the top level lineage as source -> Flink job -> sink. Where the Flink job is the process, but also to have this asset -> process -> asset pattern for each of the steps in the job. If this is present, please could you point me to it,

      Kind regards, David.





From: David Radley <da...@uk.ibm.com>
Date: Tuesday, 19 September 2023 at 16:11
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
Hi,
I notice that there is an experimental lineage integration for Flink with OpenLineage https://openlineage.io/docs/integrations/flink  . I think this feature would allow for a superior Flink OpenLineage integration,
        Kind regards, David.

From: XTransfer <ji...@xtransfer.cn.INVALID>
Date: Tuesday, 19 September 2023 at 15:47
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
Thanks Shammon for this proposal.

That’s helpful for collecting the lineage of Flink tasks.
Looking forward to its implementation.

Best,
Jiabao


> 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
>
> Thanks Shammon for the informations, the comment makes the lifecycle clearer.
> +1
>
>
> Best,
> Leonard
>
>
>> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
>>
>> Hi devs,
>>
>> After discussing with @Qingsheng, I fixed a minor issue of the lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment to explain that the lineage information in `StreamExecutionEnvironment` will be consistent with that of transformations. When users clear the existing transformations, the added lineage information will also be deleted.
>>
>> Please help to review it again, and If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks. cc @ <>Leonard
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Hi devs,
>>
>> Thanks for all the valuable feedback. If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks.
>>
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener   <https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener  >
>>
>> Best,
>> Shammon FY
>>
>>
>> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Thanks for the valuable feedback, Leonard.
>>
>> I have discussed with Leonard off-line. We have reached some conclusions about these issues and I have updated the FLIP as follows:
>>
>> 1. Simplify the `LineageEdge` interface by creating an edge from one source vertex to sink vertex.
>> 2. Remove the `TableColumnSourceLineageVertex` interface and update `TableColumnLineageEdge` to create an edge from columns in one source to each sink column.
>> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> 4. Add method `addLineageEdges(LineageEdge ... edges)` in `StreamExecutionEnviroment` for datastream job and remove previous methods in `DataStreamSource` and `DataStreamSink`.
>>
>> Looking forward to your feedback, thanks.
>>
>> Best,
>> Shammon FY
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by David Radley <da...@uk.ibm.com>.
Hi,
I notice that there is an experimental lineage integration for Flink with OpenLineage https://openlineage.io/docs/integrations/flink . I think this feature would allow for a superior Flink OpenLineage integration,
        Kind regards, David.

From: XTransfer <ji...@xtransfer.cn.INVALID>
Date: Tuesday, 19 September 2023 at 15:47
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
Thanks Shammon for this proposal.

That’s helpful for collecting the lineage of Flink tasks.
Looking forward to its implementation.

Best,
Jiabao


> 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
>
> Thanks Shammon for the informations, the comment makes the lifecycle clearer.
> +1
>
>
> Best,
> Leonard
>
>
>> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
>>
>> Hi devs,
>>
>> After discussing with @Qingsheng, I fixed a minor issue of the lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment to explain that the lineage information in `StreamExecutionEnvironment` will be consistent with that of transformations. When users clear the existing transformations, the added lineage information will also be deleted.
>>
>> Please help to review it again, and If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks. cc @ <>Leonard
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Hi devs,
>>
>> Thanks for all the valuable feedback. If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks.
>>
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener  <https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener >
>>
>> Best,
>> Shammon FY
>>
>>
>> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Thanks for the valuable feedback, Leonard.
>>
>> I have discussed with Leonard off-line. We have reached some conclusions about these issues and I have updated the FLIP as follows:
>>
>> 1. Simplify the `LineageEdge` interface by creating an edge from one source vertex to sink vertex.
>> 2. Remove the `TableColumnSourceLineageVertex` interface and update `TableColumnLineageEdge` to create an edge from columns in one source to each sink column.
>> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> 4. Add method `addLineageEdges(LineageEdge ... edges)` in `StreamExecutionEnviroment` for datastream job and remove previous methods in `DataStreamSource` and `DataStreamSink`.
>>
>> Looking forward to your feedback, thanks.
>>
>> Best,
>> Shammon FY
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by XTransfer <ji...@xtransfer.cn.INVALID>.
Thanks Shammon for this proposal.

That’s helpful for collecting the lineage of Flink tasks.
Looking forward to its implementation.

Best,
Jiabao


> 2023年9月18日 20:56,Leonard Xu <xb...@gmail.com> 写道:
> 
> Thanks Shammon for the informations, the comment makes the lifecycle clearer. 
> +1 
> 
> 
> Best,
> Leonard
> 
> 
>> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
>> 
>> Hi devs,
>> 
>> After discussing with @Qingsheng, I fixed a minor issue of the lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment to explain that the lineage information in `StreamExecutionEnvironment` will be consistent with that of transformations. When users clear the existing transformations, the added lineage information will also be deleted.
>> 
>> Please help to review it again, and If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks. cc @ <>Leonard
>> 
>> Best,
>> Shammon FY
>> 
>> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Hi devs,
>> 
>> Thanks for all the valuable feedback. If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks.
>> 
>> 
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener <https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener>
>> 
>> Best,
>> Shammon FY
>> 
>> 
>> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
>> Thanks for the valuable feedback, Leonard.
>> 
>> I have discussed with Leonard off-line. We have reached some conclusions about these issues and I have updated the FLIP as follows:
>> 
>> 1. Simplify the `LineageEdge` interface by creating an edge from one source vertex to sink vertex.
>> 2. Remove the `TableColumnSourceLineageVertex` interface and update `TableColumnLineageEdge` to create an edge from columns in one source to each sink column.
>> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> 4. Add method `addLineageEdges(LineageEdge ... edges)` in `StreamExecutionEnviroment` for datastream job and remove previous methods in `DataStreamSource` and `DataStreamSink`.
>> 
>> Looking forward to your feedback, thanks.
>> 
>> Best,
>> Shammon FY
> 


Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Leonard Xu <xb...@gmail.com>.
Thanks Shammon for the informations, the comment makes the lifecycle clearer. 
+1 


Best,
Leonard


> On Sep 18, 2023, at 7:54 PM, Shammon FY <zj...@gmail.com> wrote:
> 
> Hi devs,
> 
> After discussing with @Qingsheng, I fixed a minor issue of the lineage lifecycle in `StreamExecutionEnvironment`. I have added the comment to explain that the lineage information in `StreamExecutionEnvironment` will be consistent with that of transformations. When users clear the existing transformations, the added lineage information will also be deleted.
> 
> Please help to review it again, and If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks. cc @ <>Leonard
> 
> Best,
> Shammon FY
> 
> On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
> Hi devs,
> 
> Thanks for all the valuable feedback. If there are no more concerns about FLIP-314[1], I would like to start voting later, thanks.
> 
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener <https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener>
> 
> Best,
> Shammon FY
> 
> 
> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zjureel@gmail.com <ma...@gmail.com>> wrote:
> Thanks for the valuable feedback, Leonard.
> 
> I have discussed with Leonard off-line. We have reached some conclusions about these issues and I have updated the FLIP as follows:
> 
> 1. Simplify the `LineageEdge` interface by creating an edge from one source vertex to sink vertex.
> 2. Remove the `TableColumnSourceLineageVertex` interface and update `TableColumnLineageEdge` to create an edge from columns in one source to each sink column.
> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
> 4. Add method `addLineageEdges(LineageEdge ... edges)` in `StreamExecutionEnviroment` for datastream job and remove previous methods in `DataStreamSource` and `DataStreamSink`.
> 
> Looking forward to your feedback, thanks.
> 
> Best,
> Shammon FY


Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi devs,

After discussing with @Qingsheng, I fixed a minor issue of the lineage
lifecycle in `StreamExecutionEnvironment`. I have added the comment to
explain that the lineage information in `StreamExecutionEnvironment` will
be consistent with that of transformations. When users clear the existing
transformations, the added lineage information will also be deleted.

Please help to review it again, and If there are no more concerns about FLIP
-314[1], I would like to start voting later, thanks. cc @Leonard

Best,
Shammon FY

On Mon, Jul 17, 2023 at 3:43 PM Shammon FY <zj...@gmail.com> wrote:

> Hi devs,
>
> Thanks for all the valuable feedback. If there are no more concerns about
> FLIP-314[1], I would like to start voting later, thanks.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>
> Best,
> Shammon FY
>
>
> On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Thanks for the valuable feedback, Leonard.
>>
>> I have discussed with Leonard off-line. We have reached some conclusions
>> about these issues and I have updated the FLIP as follows:
>>
>> 1. Simplify the `LineageEdge` interface by creating an edge from one
>> source vertex to sink vertex.
>> 2. Remove the `TableColumnSourceLineageVertex` interface and update
>> `TableColumnLineageEdge` to create an edge from columns in one source to
>> each sink column.
>> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
>> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
>> `StreamExecutionEnviroment` for datastream job and remove previous methods
>> in `DataStreamSource` and `DataStreamSink`.
>>
>> Looking forward to your feedback, thanks.
>>
>> Best,
>> Shammon FY
>>
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi devs,

Thanks for all the valuable feedback. If there are no more concerns about
FLIP-314[1], I would like to start voting later, thanks.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener

Best,
Shammon FY


On Wed, Jul 12, 2023 at 11:18 AM Shammon FY <zj...@gmail.com> wrote:

> Thanks for the valuable feedback, Leonard.
>
> I have discussed with Leonard off-line. We have reached some conclusions
> about these issues and I have updated the FLIP as follows:
>
> 1. Simplify the `LineageEdge` interface by creating an edge from one
> source vertex to sink vertex.
> 2. Remove the `TableColumnSourceLineageVertex` interface and update
> `TableColumnLineageEdge` to create an edge from columns in one source to
> each sink column.
> 3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
> 4. Add method `addLineageEdges(LineageEdge ... edges)` in
> `StreamExecutionEnviroment` for datastream job and remove previous methods
> in `DataStreamSource` and `DataStreamSink`.
>
> Looking forward to your feedback, thanks.
>
> Best,
> Shammon FY
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Thanks for the valuable feedback, Leonard.

I have discussed with Leonard off-line. We have reached some conclusions
about these issues and I have updated the FLIP as follows:

1. Simplify the `LineageEdge` interface by creating an edge from one source
vertex to sink vertex.
2. Remove the `TableColumnSourceLineageVertex` interface and update
`TableColumnLineageEdge` to create an edge from columns in one source to
each sink column.
3. Rename `SupportsLineageVertex` to `LineageVertexProvider`
4. Add method `addLineageEdges(LineageEdge ... edges)` in
`StreamExecutionEnviroment` for datastream job and remove previous methods
in `DataStreamSource` and `DataStreamSink`.

Looking forward to your feedback, thanks.

Best,
Shammon FY

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Leonard Xu <xb...@gmail.com>.
Thanks Shammon for driving this FLIP, I have some comments about the updated FLIP.


1. It confuses for me that LineageEdge interface contains multiple sources but one sink, the relation looks like a graph instead of an edge in topology.  

2. TableColumnSourceLineageVertex interface  is not clear for me as why it’s designed to return  multiple columns, is there the case 
ColumnAOfSrcT1 -> columnAofSinkT1, ColumnBOfSrcT1->  columnAofSinkT1 ? 
Minor: the table() method name could be improved as it returns a  TableLineageVertex instead of Table.

3. SupportsLineageVertex for Datastream source and sink are not natural to me, the underling logic is source/sink can provide a linage information optionally, 
how about change it to LineageVertexProvider { LineageVertex getLineageVertex();} ?

4.I’m hesitate to introducing method setLineageVertex for DataStreamSource and DataStreamSink as: 
   (a) Lineage is different to parallelism of source/sink, it should be unique and certain for a given source/sink, I prefer to only expose one interface to users. 
   (b) The way user to build SourceLineageVertex VS the way user to implement a LineageVertexProvider, which cost is higher? 
   minor: "SourceLineageEdge LineageEdge;” looks like a typo ?

5.DataStreamSink#setLineageEdge is not clear for me now, a sink can hold all LineageEdges even these edges’ target is not the sink itself, right? It’s incorrect here.
I have an intuition that the lineages of all pipelines in a datastream application should belong to the StreamExecutionEnviroment , introducing setLineageEdge method for 
StreamExecutionEnviroment should be better than current proposal.

Best,
Leonard  


> On Jul 5, 2023, at 5:26 PM, Shammon FY <zj...@gmail.com> wrote:
> 
> Hi Jing,
> 
> Thanks for your feedback.
> 
>> 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
> 
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
> 
>> 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
> 
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
> 
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
> 
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
> 
> Best,
> Shammon FY
> 
> 
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid> wrote:
> 
>> Hi Shammon,
>> 
>> Thanks for your proposal. After reading the FLIP, I'd like to ask
>> some questions to make sure we are on the same page. Thanks!
>> 
>> 1. TableColumnLineageRelation#sinkColumn() should return
>> TableColumnLineageEntity instead of String, right?
>> 
>> 2. Since LineageRelation already contains all information to build the
>> lineage between sources and sink, do we still need to set the LineageEntity
>> in the source?
>> 
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
>> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
>> which contains multiple LineageEdge? E.g. multiple sources join into one
>> sink, or, edges of columns from one or different tables, etc.
>> 
>> Best regards,
>> Jing
>> 
>> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:
>> 
>>> Hi yuxia and Yun,
>>> 
>>> Thanks for your input.
>>> 
>>> For yuxia:
>>>> 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
>>> 
>>> At present, we only need to notify the listener when a job goes to
>>> termination, but I think it makes sense to add generic `oldStatus` and
>>> `newStatus` in the listener and users can update the job state in their
>>> service as needed.
>>> 
>>>> 2: I'm really confused about the `config()` included in
>> `LineageEntity`,
>>> where is it from and what is it for ?
>>> 
>>> The `config` in `LineageEntity` is used for users to get options for
>> source
>>> and sink connectors. As the examples in the FLIP, users can add
>>> server/group/topic information in the config for kafka and create lineage
>>> entities for `DataStream` jobs, then the listeners can get this
>> information
>>> to identify the same connector in different jobs. Otherwise, the `config`
>>> in `TableLineageEntity` will be the same as `getOptions` in
>>> `CatalogBaseTable`.
>>> 
>>>> 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
>> is
>>> needed or not, since `TableSinkLineageEntity` contains
>>> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
>>> changelogmode?
>>> 
>>> At present, we do not actually use the changelog mode. It can be deleted,
>>> and I have updated FLIP.
>>> 
>>>> Btw, since there're a lot interfaces proposed, I think it'll be better
>> to
>>> give an example about how to implement a listener in this FLIP to make us
>>> know better about the interfaces.
>>> 
>>> I have added the example in the FLIP and the related interfaces and
>>> examples are in branch [1].
>>> 
>>> For Yun:
>>>> I have one more question on the lookup-join dim tables, it seems this
>>> FLIP does not touch them, and will them become part of the
>>> List<LineageEntity> sources() or adding another interface?
>>> 
>>> You're right, currently lookup join dim tables were not considered in the
>>> 'proposed changed' section of this FLIP. But the interface for lineage is
>>> universal and we can give `TableLookupSourceLineageEntity` which
>> implements
>>> `TableSourceLineageEntity` in the future without modifying the public
>>> interface.
>>> 
>>>> By the way, if you want to focus on job lineage instead of data column
>>> lineage in this FLIP, why we must introduce so many column-lineage
>> related
>>> interface here?
>>> 
>>> The lineage information in SQL jobs includes table lineage and column
>>> lineage. Although SQL jobs currently do not support column lineage, we
>>> would like to support this in the next step. So we have comprehensively
>>> considered the table lineage and column lineage interfaces here, and
>>> defined these two interfaces together clearly
>>> 
>>> 
>>> [1]
>>> 
>>> 
>> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>>> 
>>> Best,
>>> Shammon FY
>>> 
>>> 
>>> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
>>> 
>>>> Hi Shammon,
>>>> 
>>>> I like the idea in general and it will help to analysis the job
>> lineages
>>>> no matter FlinkSQL or Flink jar jobs in production environments.
>>>> 
>>>> For Qingsheng's concern, I'd like the name of JobType more than
>>>> RuntimeExecutionMode, as the latter one is not easy to understand for
>>> users.
>>>> 
>>>> I have one more question on the lookup-join dim tables, it seems this
>>> FLIP
>>>> does not touch them, and will them become part of the
>> List<LineageEntity>
>>>> sources()​ or adding another interface?
>>>> 
>>>> By the way, if you want to focus on job lineage instead of data column
>>>> lineage in this FLIP, why we must introduce so many column-lineage
>>> related
>>>> interface here?
>>>> 
>>>> 
>>>> Best
>>>> Yun Tang
>>>> ________________________________
>>>> From: Shammon FY <zj...@gmail.com>
>>>> Sent: Sunday, June 25, 2023 16:13
>>>> To: dev@flink.apache.org <de...@flink.apache.org>
>>>> Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
>> Listener
>>>> 
>>>> Hi Qingsheng,
>>>> 
>>>> Thanks for your valuable feedback.
>>>> 
>>>>> 1. Is there any specific use case to expose the batch / streaming
>> info
>>> to
>>>> listeners or meta services?
>>>> 
>>>> I agree with you that Flink is evolving towards batch-streaming
>>>> unification, but the lifecycle of them is different. If a job
>> processes a
>>>> bound dataset, it will end after completing the data processing,
>>> otherwise,
>>>> it will run for a long time. In our scenario, we will regularly
>> schedule
>>>> some Flink jobs to process bound dataset and update some job
>> information
>>> to
>>>> the lineage information for the "batch" jobs such as scheduled
>> timestamp,
>>>> execution duration when jobs are finished, which is different from
>>>> "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
>>>> `existsUnboundedSource` in `StreamingGraph` and
>> `StreamingGraphGenerator`
>>>> to determine `JobType` and disjoin jobs. We can mark `JobType` as
>>>> `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
>>> do
>>>> you think of it?
>>>> 
>>>>> 2. it’s better to be more specific here to tell users what
>> information
>>>> they could expect to see here, instead of just a “job configuration” as
>>>> described in JavaDoc.
>>>> 
>>>> Thanks and I have updated the doc in FLIP.
>>>> 
>>>>> 3. About the IO executor in JobStatusChangedListenerFactory.Context.
>>>> 
>>>> I have updated the docs for io executor  in
>>>> `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
>>> and
>>>> executes submitted tasks in parallel. Users can submit tasks to the
>>>> executor which ensures that the submitted task can be executed before
>> the
>>>> job exits.
>>>> 
>>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list
>> of
>>>> LineageEntity.
>>>> 
>>>> In the initial idea, the `LineageRelationEntity` is used for
>> `DataStream`
>>>> to set additional lineage information besides source. For example,
>> there
>>>> are table and column lineages in SQL jobs. When we build a `DataStream`
>>> job
>>>> with table source and sink, we can add table lineage in the following
>>>> method.
>>>> ```
>>>> public class DataStreamSink {
>>>>    public DataStreamSink setLineageSources(LineageEntity ... sources);
>>>> }
>>>> ```
>>>> But we can not set column lineage for the above sink, and for the sake
>> of
>>>> universality, we do not want to add a method similar to
>> `addLineageColumn
>>>> (...)` in `DataStreamSink`. So I put this information into
>>>> LineageRelationEntity so that SQL and DataStream jobs can be
>> consistent.
>>>> But as you mentioned, this approach does indeed lead to ambiguity and
>>>> complexity. So my current idea is to add the `setLineageRelation`
>> method
>>> in
>>>> `DataStreamSink` directly without `LineageRelationEntity`, I have
>> updated
>>>> the FLIP and please help to review it again, thanks.
>>>> 
>>>>> 5. I can’t find the definition of CatalogContext in the current code
>>> base
>>>> and Flink, which appears in the TableLineageEntity.
>>>> 
>>>> CatalogContext is defined in FLIP-294 and I have updated the FLIP
>>>> 
>>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>>> boolean
>>>> (the “override” is quite confusing). I’m wondering if these are
>> necessary
>>>> for meta services, as they are actually concepts defined in the runtime
>>>> level of Flink Table / SQL.
>>>> 
>>>> The information in `TableSinkLineageEntity` such as `ModifyType`,
>>>> `ChangelogMode` and `override` are mainly used for verification and
>>>> display. For example, Flink currently supports `INSERT`/`DELETE` and
>>>> `UPDATE`, we only want to report and update lineage for `INSERT` jobs
>> in
>>>> our streaming & batch ETL, and display the `override` information on
>> the
>>>> UI.
>>>> 
>>>> 
>>>> Best,
>>>> Shammon FY
>>>> 
>>>> 
>>>> On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
>> wrote:
>>>> 
>>>>> Hi Shammon,
>>>>> 
>>>>> Thanks for starting this FLIP! Data lineage is a very important
>> topic,
>>>>> which has been missing for a long time in Flink. I have some
>> questions
>>>>> about the FLIP.
>>>>> 
>>>>> About events and listeners:
>>>>> 
>>>>> 1. I’m not sure if it is necessary to expose JobType to in
>>>> JobCreatedEvent.
>>>>> This is an internal class in flink-runtime, and I think the correct
>> API
>>>>> should be RuntimeExecutionMode. Furthermore, I think the boundary of
>>>> batch
>>>>> and streaming becomes much more vague as Flink is evolving towards
>>>>> batch-streaming unification, so I’m concerned about exposing JobType
>>> as a
>>>>> public API. Is there any specific use case to expose the batch /
>>>> streaming
>>>>> info to listeners or meta services?
>>>>> 
>>>>> 2. Currently JobCreatedEvent gives a Configuration, which is quite
>>>>> ambiguous. To be honest the configuration is quite a mess in Flink,
>> so
>>>>> maybe it’s better to be more specific here to tell users what
>>> information
>>>>> they could expect to see here, instead of just a “job configuration”
>> as
>>>>> described in JavaDoc.
>>>>> 
>>>>> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
>>>> think
>>>>> more information should be provided here, such as which thread model
>>> this
>>>>> executor could promise, and whether the user should care about
>>>> concurrency
>>>>> issues. Otherwise I prefer not to give such an utility that no one
>>> dares
>>>> to
>>>>> use safely, and leave it to users to choose their implementation.
>>>>> 
>>>>> About lineage:
>>>>> 
>>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list
>> of
>>>>> LineageEntity. Could you elaborate more on this class? From my naive
>>>>> imagination, the lineage is shaped as a DAG, where vertices are
>> sources
>>>> and
>>>>> sinks (LineageEntity) and edges are connections between them
>>>>> (LineageRelation), so it is a bit confusing for a name mixing these
>> two
>>>>> concepts.
>>>>> 
>>>>> 5. I can’t find the definition of CatalogContext in the current code
>>> base
>>>>> and Flink, which appears in the TableLineageEntity.
>>>>> 
>>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>>> boolean
>>>>> (the “override” is quite confusing). I’m wondering if these are
>>> necessary
>>>>> for meta services, as they are actually concepts defined in the
>> runtime
>>>>> level of Flink Table / SQL.
>>>>> 
>>>>> Best,
>>>>> Qingsheng
>>>>> 
>>>>> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Hi devs,
>>>>>> 
>>>>>> Is there any comment or feedback for this FLIP? Hope to hear from
>>> you,
>>>>>> thanks
>>>>>> 
>>>>>> Best,
>>>>>> Shammon FY
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Hi devs,
>>>>>>> 
>>>>>>> I would like to start a discussion on FLIP-314: Support
>> Customized
>>>> Job
>>>>>>> Lineage Listener[1] which is the next stage of FLIP-294 [2].
>> Flink
>>>>>>> streaming and batch jobs create lineage dependency between source
>>> and
>>>>>> sink,
>>>>>>> users can manage their data and jobs according to this lineage
>>>>>> information.
>>>>>>> For example, when there is a delay in Flink ETL or data, users
>> can
>>>>> easily
>>>>>>> trace the problematic jobs and affected data. On the other hand,
>>> when
>>>>>> users
>>>>>>> need to correct data or debug, they can perform operations based
>> on
>>>>>> lineage
>>>>>>> too.
>>>>>>> 
>>>>>>> In FLIP-314 we want to introduce lineage related interfaces for
>>> Flink
>>>>> and
>>>>>>> users can create customized job status listeners. When job status
>>>>>> changes,
>>>>>>> users can get job status and information to add, update or delete
>>>>>> lineage.
>>>>>>> 
>>>>>>> Looking forward to your feedback, thanks.
>>>>>>> 
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>>>>>>> [2]
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>>>>>>> 
>>>>>>> Best,
>>>>>>> Shammon FY
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi devs,

Thanks for all the feedback.

I have discussed with @QingSheng Ren off-line to confirm some questionable
points in the FLIP. Thanks for his valuable inputs and I have updated the
FLIP according to our discussion.

Looking forward to your feedback, thanks,

Best,
Shammon FY


On Wed, Jul 5, 2023 at 5:26 PM Shammon FY <zj...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for your feedback.
>
> > 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
>
> > 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
>
> > 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
>
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
>
> Best,
> Shammon FY
>
>
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid>
> wrote:
>
>> Hi Shammon,
>>
>> Thanks for your proposal. After reading the FLIP, I'd like to ask
>> some questions to make sure we are on the same page. Thanks!
>>
>> 1. TableColumnLineageRelation#sinkColumn() should return
>> TableColumnLineageEntity instead of String, right?
>>
>> 2. Since LineageRelation already contains all information to build the
>> lineage between sources and sink, do we still need to set the
>> LineageEntity
>> in the source?
>>
>> 3. About the "Entity" and "Relation" naming, I was confused too, like
>> Qingsheng mentioned. How about LineageVertex, LineageEdge, and
>> LineageEdges
>> which contains multiple LineageEdge? E.g. multiple sources join into one
>> sink, or, edges of columns from one or different tables, etc.
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:
>>
>> > Hi yuxia and Yun,
>> >
>> > Thanks for your input.
>> >
>> > For yuxia:
>> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
>> including?
>> >
>> > At present, we only need to notify the listener when a job goes to
>> > termination, but I think it makes sense to add generic `oldStatus` and
>> > `newStatus` in the listener and users can update the job state in their
>> > service as needed.
>> >
>> > > 2: I'm really confused about the `config()` included in
>> `LineageEntity`,
>> > where is it from and what is it for ?
>> >
>> > The `config` in `LineageEntity` is used for users to get options for
>> source
>> > and sink connectors. As the examples in the FLIP, users can add
>> > server/group/topic information in the config for kafka and create
>> lineage
>> > entities for `DataStream` jobs, then the listeners can get this
>> information
>> > to identify the same connector in different jobs. Otherwise, the
>> `config`
>> > in `TableLineageEntity` will be the same as `getOptions` in
>> > `CatalogBaseTable`.
>> >
>> > > 3: Regardless whether `inputChangelogMode` in
>> `TableSinkLineageEntity` is
>> > needed or not, since `TableSinkLineageEntity` contains
>> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
>> > changelogmode?
>> >
>> > At present, we do not actually use the changelog mode. It can be
>> deleted,
>> > and I have updated FLIP.
>> >
>> > > Btw, since there're a lot interfaces proposed, I think it'll be
>> better to
>> > give an example about how to implement a listener in this FLIP to make
>> us
>> > know better about the interfaces.
>> >
>> > I have added the example in the FLIP and the related interfaces and
>> > examples are in branch [1].
>> >
>> > For Yun:
>> > > I have one more question on the lookup-join dim tables, it seems this
>> > FLIP does not touch them, and will them become part of the
>> > List<LineageEntity> sources() or adding another interface?
>> >
>> > You're right, currently lookup join dim tables were not considered in
>> the
>> > 'proposed changed' section of this FLIP. But the interface for lineage
>> is
>> > universal and we can give `TableLookupSourceLineageEntity` which
>> implements
>> > `TableSourceLineageEntity` in the future without modifying the public
>> > interface.
>> >
>> > > By the way, if you want to focus on job lineage instead of data column
>> > lineage in this FLIP, why we must introduce so many column-lineage
>> related
>> > interface here?
>> >
>> > The lineage information in SQL jobs includes table lineage and column
>> > lineage. Although SQL jobs currently do not support column lineage, we
>> > would like to support this in the next step. So we have comprehensively
>> > considered the table lineage and column lineage interfaces here, and
>> > defined these two interfaces together clearly
>> >
>> >
>> > [1]
>> >
>> >
>> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>> >
>> > Best,
>> > Shammon FY
>> >
>> >
>> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
>> >
>> > > Hi Shammon,
>> > >
>> > > I like the idea in general and it will help to analysis the job
>> lineages
>> > > no matter FlinkSQL or Flink jar jobs in production environments.
>> > >
>> > > For Qingsheng's concern, I'd like the name of JobType more than
>> > > RuntimeExecutionMode, as the latter one is not easy to understand for
>> > users.
>> > >
>> > > I have one more question on the lookup-join dim tables, it seems this
>> > FLIP
>> > > does not touch them, and will them become part of the
>> List<LineageEntity>
>> > > sources()​ or adding another interface?
>> > >
>> > > By the way, if you want to focus on job lineage instead of data column
>> > > lineage in this FLIP, why we must introduce so many column-lineage
>> > related
>> > > interface here?
>> > >
>> > >
>> > > Best
>> > > Yun Tang
>> > > ________________________________
>> > > From: Shammon FY <zj...@gmail.com>
>> > > Sent: Sunday, June 25, 2023 16:13
>> > > To: dev@flink.apache.org <de...@flink.apache.org>
>> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
>> Listener
>> > >
>> > > Hi Qingsheng,
>> > >
>> > > Thanks for your valuable feedback.
>> > >
>> > > > 1. Is there any specific use case to expose the batch / streaming
>> info
>> > to
>> > > listeners or meta services?
>> > >
>> > > I agree with you that Flink is evolving towards batch-streaming
>> > > unification, but the lifecycle of them is different. If a job
>> processes a
>> > > bound dataset, it will end after completing the data processing,
>> > otherwise,
>> > > it will run for a long time. In our scenario, we will regularly
>> schedule
>> > > some Flink jobs to process bound dataset and update some job
>> information
>> > to
>> > > the lineage information for the "batch" jobs such as scheduled
>> timestamp,
>> > > execution duration when jobs are finished, which is different from
>> > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
>> > > `existsUnboundedSource` in `StreamingGraph` and
>> `StreamingGraphGenerator`
>> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
>> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag,
>> what
>> > do
>> > > you think of it?
>> > >
>> > > > 2. it’s better to be more specific here to tell users what
>> information
>> > > they could expect to see here, instead of just a “job configuration”
>> as
>> > > described in JavaDoc.
>> > >
>> > > Thanks and I have updated the doc in FLIP.
>> > >
>> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
>> > >
>> > > I have updated the docs for io executor  in
>> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
>> > and
>> > > executes submitted tasks in parallel. Users can submit tasks to the
>> > > executor which ensures that the submitted task can be executed before
>> the
>> > > job exits.
>> > >
>> > > > 4. I don’t quite get the LineageRelationEntity, which is just a
>> list of
>> > > LineageEntity.
>> > >
>> > > In the initial idea, the `LineageRelationEntity` is used for
>> `DataStream`
>> > > to set additional lineage information besides source. For example,
>> there
>> > > are table and column lineages in SQL jobs. When we build a
>> `DataStream`
>> > job
>> > > with table source and sink, we can add table lineage in the following
>> > > method.
>> > > ```
>> > > public class DataStreamSink {
>> > >     public DataStreamSink setLineageSources(LineageEntity ...
>> sources);
>> > > }
>> > > ```
>> > > But we can not set column lineage for the above sink, and for the
>> sake of
>> > > universality, we do not want to add a method similar to
>> `addLineageColumn
>> > > (...)` in `DataStreamSink`. So I put this information into
>> > > LineageRelationEntity so that SQL and DataStream jobs can be
>> consistent.
>> > > But as you mentioned, this approach does indeed lead to ambiguity and
>> > > complexity. So my current idea is to add the `setLineageRelation`
>> method
>> > in
>> > > `DataStreamSink` directly without `LineageRelationEntity`, I have
>> updated
>> > > the FLIP and please help to review it again, thanks.
>> > >
>> > > > 5. I can’t find the definition of CatalogContext in the current code
>> > base
>> > > and Flink, which appears in the TableLineageEntity.
>> > >
>> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
>> > >
>> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>> > boolean
>> > > (the “override” is quite confusing). I’m wondering if these are
>> necessary
>> > > for meta services, as they are actually concepts defined in the
>> runtime
>> > > level of Flink Table / SQL.
>> > >
>> > > The information in `TableSinkLineageEntity` such as `ModifyType`,
>> > > `ChangelogMode` and `override` are mainly used for verification and
>> > > display. For example, Flink currently supports `INSERT`/`DELETE` and
>> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
>> in
>> > > our streaming & batch ETL, and display the `override` information on
>> the
>> > > UI.
>> > >
>> > >
>> > > Best,
>> > > Shammon FY
>> > >
>> > >
>> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
>> wrote:
>> > >
>> > > > Hi Shammon,
>> > > >
>> > > > Thanks for starting this FLIP! Data lineage is a very important
>> topic,
>> > > > which has been missing for a long time in Flink. I have some
>> questions
>> > > > about the FLIP.
>> > > >
>> > > > About events and listeners:
>> > > >
>> > > > 1. I’m not sure if it is necessary to expose JobType to in
>> > > JobCreatedEvent.
>> > > > This is an internal class in flink-runtime, and I think the correct
>> API
>> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
>> > > batch
>> > > > and streaming becomes much more vague as Flink is evolving towards
>> > > > batch-streaming unification, so I’m concerned about exposing JobType
>> > as a
>> > > > public API. Is there any specific use case to expose the batch /
>> > > streaming
>> > > > info to listeners or meta services?
>> > > >
>> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
>> > > > ambiguous. To be honest the configuration is quite a mess in Flink,
>> so
>> > > > maybe it’s better to be more specific here to tell users what
>> > information
>> > > > they could expect to see here, instead of just a “job
>> configuration” as
>> > > > described in JavaDoc.
>> > > >
>> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor.
>> I
>> > > think
>> > > > more information should be provided here, such as which thread model
>> > this
>> > > > executor could promise, and whether the user should care about
>> > > concurrency
>> > > > issues. Otherwise I prefer not to give such an utility that no one
>> > dares
>> > > to
>> > > > use safely, and leave it to users to choose their implementation.
>> > > >
>> > > > About lineage:
>> > > >
>> > > > 4. I don’t quite get the LineageRelationEntity, which is just a
>> list of
>> > > > LineageEntity. Could you elaborate more on this class? From my naive
>> > > > imagination, the lineage is shaped as a DAG, where vertices are
>> sources
>> > > and
>> > > > sinks (LineageEntity) and edges are connections between them
>> > > > (LineageRelation), so it is a bit confusing for a name mixing these
>> two
>> > > > concepts.
>> > > >
>> > > > 5. I can’t find the definition of CatalogContext in the current code
>> > base
>> > > > and Flink, which appears in the TableLineageEntity.
>> > > >
>> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
>> > boolean
>> > > > (the “override” is quite confusing). I’m wondering if these are
>> > necessary
>> > > > for meta services, as they are actually concepts defined in the
>> runtime
>> > > > level of Flink Table / SQL.
>> > > >
>> > > > Best,
>> > > > Qingsheng
>> > > >
>> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi devs,
>> > > > >
>> > > > > Is there any comment or feedback for this FLIP? Hope to hear from
>> > you,
>> > > > > thanks
>> > > > >
>> > > > > Best,
>> > > > > Shammon FY
>> > > > >
>> > > > >
>> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
>> wrote:
>> > > > >
>> > > > > > Hi devs,
>> > > > > >
>> > > > > > I would like to start a discussion on FLIP-314: Support
>> Customized
>> > > Job
>> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
>> Flink
>> > > > > > streaming and batch jobs create lineage dependency between
>> source
>> > and
>> > > > > sink,
>> > > > > > users can manage their data and jobs according to this lineage
>> > > > > information.
>> > > > > > For example, when there is a delay in Flink ETL or data, users
>> can
>> > > > easily
>> > > > > > trace the problematic jobs and affected data. On the other hand,
>> > when
>> > > > > users
>> > > > > > need to correct data or debug, they can perform operations
>> based on
>> > > > > lineage
>> > > > > > too.
>> > > > > >
>> > > > > > In FLIP-314 we want to introduce lineage related interfaces for
>> > Flink
>> > > > and
>> > > > > > users can create customized job status listeners. When job
>> status
>> > > > > changes,
>> > > > > > users can get job status and information to add, update or
>> delete
>> > > > > lineage.
>> > > > > >
>> > > > > > Looking forward to your feedback, thanks.
>> > > > > >
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>> > > > > > [2]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>> > > > > >
>> > > > > > Best,
>> > > > > > Shammon FY
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Shammon,

Thanks for the update!

Best regards,
Jing

On Fri, Jul 7, 2023 at 4:46 AM Shammon FY <zj...@gmail.com> wrote:

> Thanks Jing, sounds good to me.
>
> I have updated the FLIP and renamed the lineage related classes to
> `LineageGraph`, `LineageVertex` and `LineageEdge` and keep it consistent
> with the job definition in Flink.
>
> Best,
> Shammon FY
>
> On Thu, Jul 6, 2023 at 8:25 PM Jing Ge <ji...@ververica.com.invalid> wrote:
>
> > Hi Shammon,
> >
> > Thanks for the clarification. Atlas might have his historical reason back
> > to the hadoop era or maybe even back to the hibernate where Entity and
> > Relation were commonly used. Flink already used Vertex and Edge to
> describe
> > DAG. Some popular tools like dbt are also using this convention[1] and,
> > afaik, most graph frameworks use vertex and edge too. It will be easier
> for
> > Flink devs and users to have a consistent naming convention for the same
> > concept, i.e. in this case, DAG.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> >
> https://docs.getdbt.com/docs/dbt-cloud-apis/discovery-use-cases-and-examples#discovery
> >
> > On Wed, Jul 5, 2023 at 11:28 AM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for your feedback.
> > >
> > > > 1. TableColumnLineageRelation#sinkColumn() should return
> > > TableColumnLineageEntity instead of String, right?
> > >
> > > The `sinkColumn()` will return `String` which is the column name in the
> > > sink connector. I found the name of `TableColumnLineageEntity` may
> > > cause ambiguity and I have renamed it to
> > `TableColumnSourceLineageEntity`.
> > > In my mind the `TableColumnLineageRelation` represents the lineage for
> > each
> > > sink column, each column may be computed from multiple sources and
> > columns.
> > > I use `TableColumnSourceLineageEntity` to manage each source and its
> > > columns for the sink column, so `TableColumnLineageRelation` has a sink
> > > column name and `TableColumnSourceLineageEntity` list.
> > >
> > > > 2. Since LineageRelation already contains all information to build
> the
> > > lineage between sources and sink, do we still need to set the
> > LineageEntity
> > > in the source?
> > >
> > > The lineage interface of `DataStream` is very flexible. We have added
> > > `setLineageEntity` to the source to limit and verify user behavior,
> > > ensuring that users have not added non-existent sources as lineage.
> > >
> > > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> > LineageEdges
> > > which contains multiple LineageEdge?
> > >
> > > We referred to `Atlas` for the name of lineage, it uses `Entity` and
> > > `Relation` to represent the lineage relationship and another metadata
> > > service `Datahub` uses `DataSet` to represent the entity. I think
> > `Entity`
> > > and `Relation` are nicer for lineage, what do you think of it?
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid>
> > > wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for your proposal. After reading the FLIP, I'd like to ask
> > > > some questions to make sure we are on the same page. Thanks!
> > > >
> > > > 1. TableColumnLineageRelation#sinkColumn() should return
> > > > TableColumnLineageEntity instead of String, right?
> > > >
> > > > 2. Since LineageRelation already contains all information to build
> the
> > > > lineage between sources and sink, do we still need to set the
> > > LineageEntity
> > > > in the source?
> > > >
> > > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > > > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> > > LineageEdges
> > > > which contains multiple LineageEdge? E.g. multiple sources join into
> > one
> > > > sink, or, edges of columns from one or different tables, etc.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > >
> > > > > Hi yuxia and Yun,
> > > > >
> > > > > Thanks for your input.
> > > > >
> > > > > For yuxia:
> > > > > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
> > > including?
> > > > >
> > > > > At present, we only need to notify the listener when a job goes to
> > > > > termination, but I think it makes sense to add generic `oldStatus`
> > and
> > > > > `newStatus` in the listener and users can update the job state in
> > their
> > > > > service as needed.
> > > > >
> > > > > > 2: I'm really confused about the `config()` included in
> > > > `LineageEntity`,
> > > > > where is it from and what is it for ?
> > > > >
> > > > > The `config` in `LineageEntity` is used for users to get options
> for
> > > > source
> > > > > and sink connectors. As the examples in the FLIP, users can add
> > > > > server/group/topic information in the config for kafka and create
> > > lineage
> > > > > entities for `DataStream` jobs, then the listeners can get this
> > > > information
> > > > > to identify the same connector in different jobs. Otherwise, the
> > > `config`
> > > > > in `TableLineageEntity` will be the same as `getOptions` in
> > > > > `CatalogBaseTable`.
> > > > >
> > > > > > 3: Regardless whether `inputChangelogMode` in
> > > `TableSinkLineageEntity`
> > > > is
> > > > > needed or not, since `TableSinkLineageEntity` contains
> > > > > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > > > > changelogmode?
> > > > >
> > > > > At present, we do not actually use the changelog mode. It can be
> > > deleted,
> > > > > and I have updated FLIP.
> > > > >
> > > > > > Btw, since there're a lot interfaces proposed, I think it'll be
> > > better
> > > > to
> > > > > give an example about how to implement a listener in this FLIP to
> > make
> > > us
> > > > > know better about the interfaces.
> > > > >
> > > > > I have added the example in the FLIP and the related interfaces and
> > > > > examples are in branch [1].
> > > > >
> > > > > For Yun:
> > > > > > I have one more question on the lookup-join dim tables, it seems
> > this
> > > > > FLIP does not touch them, and will them become part of the
> > > > > List<LineageEntity> sources() or adding another interface?
> > > > >
> > > > > You're right, currently lookup join dim tables were not considered
> in
> > > the
> > > > > 'proposed changed' section of this FLIP. But the interface for
> > lineage
> > > is
> > > > > universal and we can give `TableLookupSourceLineageEntity` which
> > > > implements
> > > > > `TableSourceLineageEntity` in the future without modifying the
> public
> > > > > interface.
> > > > >
> > > > > > By the way, if you want to focus on job lineage instead of data
> > > column
> > > > > lineage in this FLIP, why we must introduce so many column-lineage
> > > > related
> > > > > interface here?
> > > > >
> > > > > The lineage information in SQL jobs includes table lineage and
> column
> > > > > lineage. Although SQL jobs currently do not support column lineage,
> > we
> > > > > would like to support this in the next step. So we have
> > comprehensively
> > > > > considered the table lineage and column lineage interfaces here,
> and
> > > > > defined these two interfaces together clearly
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
> > > > >
> > > > > > Hi Shammon,
> > > > > >
> > > > > > I like the idea in general and it will help to analysis the job
> > > > lineages
> > > > > > no matter FlinkSQL or Flink jar jobs in production environments.
> > > > > >
> > > > > > For Qingsheng's concern, I'd like the name of JobType more than
> > > > > > RuntimeExecutionMode, as the latter one is not easy to understand
> > for
> > > > > users.
> > > > > >
> > > > > > I have one more question on the lookup-join dim tables, it seems
> > this
> > > > > FLIP
> > > > > > does not touch them, and will them become part of the
> > > > List<LineageEntity>
> > > > > > sources()​ or adding another interface?
> > > > > >
> > > > > > By the way, if you want to focus on job lineage instead of data
> > > column
> > > > > > lineage in this FLIP, why we must introduce so many
> column-lineage
> > > > > related
> > > > > > interface here?
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > > ________________________________
> > > > > > From: Shammon FY <zj...@gmail.com>
> > > > > > Sent: Sunday, June 25, 2023 16:13
> > > > > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > > > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> > > > Listener
> > > > > >
> > > > > > Hi Qingsheng,
> > > > > >
> > > > > > Thanks for your valuable feedback.
> > > > > >
> > > > > > > 1. Is there any specific use case to expose the batch /
> streaming
> > > > info
> > > > > to
> > > > > > listeners or meta services?
> > > > > >
> > > > > > I agree with you that Flink is evolving towards batch-streaming
> > > > > > unification, but the lifecycle of them is different. If a job
> > > > processes a
> > > > > > bound dataset, it will end after completing the data processing,
> > > > > otherwise,
> > > > > > it will run for a long time. In our scenario, we will regularly
> > > > schedule
> > > > > > some Flink jobs to process bound dataset and update some job
> > > > information
> > > > > to
> > > > > > the lineage information for the "batch" jobs such as scheduled
> > > > timestamp,
> > > > > > execution duration when jobs are finished, which is different
> from
> > > > > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode`
> and
> > > > > > `existsUnboundedSource` in `StreamingGraph` and
> > > > `StreamingGraphGenerator`
> > > > > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > > > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean
> flag,
> > > what
> > > > > do
> > > > > > you think of it?
> > > > > >
> > > > > > > 2. it’s better to be more specific here to tell users what
> > > > information
> > > > > > they could expect to see here, instead of just a “job
> > configuration”
> > > as
> > > > > > described in JavaDoc.
> > > > > >
> > > > > > Thanks and I have updated the doc in FLIP.
> > > > > >
> > > > > > > 3. About the IO executor in
> > > JobStatusChangedListenerFactory.Context.
> > > > > >
> > > > > > I have updated the docs for io executor  in
> > > > > > `JobStatusChangedListenerFactory.Context`, it is a regular thread
> > > pool
> > > > > and
> > > > > > executes submitted tasks in parallel. Users can submit tasks to
> the
> > > > > > executor which ensures that the submitted task can be executed
> > before
> > > > the
> > > > > > job exits.
> > > > > >
> > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > > list
> > > > of
> > > > > > LineageEntity.
> > > > > >
> > > > > > In the initial idea, the `LineageRelationEntity` is used for
> > > > `DataStream`
> > > > > > to set additional lineage information besides source. For
> example,
> > > > there
> > > > > > are table and column lineages in SQL jobs. When we build a
> > > `DataStream`
> > > > > job
> > > > > > with table source and sink, we can add table lineage in the
> > following
> > > > > > method.
> > > > > > ```
> > > > > > public class DataStreamSink {
> > > > > >     public DataStreamSink setLineageSources(LineageEntity ...
> > > sources);
> > > > > > }
> > > > > > ```
> > > > > > But we can not set column lineage for the above sink, and for the
> > > sake
> > > > of
> > > > > > universality, we do not want to add a method similar to
> > > > `addLineageColumn
> > > > > > (...)` in `DataStreamSink`. So I put this information into
> > > > > > LineageRelationEntity so that SQL and DataStream jobs can be
> > > > consistent.
> > > > > > But as you mentioned, this approach does indeed lead to ambiguity
> > and
> > > > > > complexity. So my current idea is to add the `setLineageRelation`
> > > > method
> > > > > in
> > > > > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> > > > updated
> > > > > > the FLIP and please help to review it again, thanks.
> > > > > >
> > > > > > > 5. I can’t find the definition of CatalogContext in the current
> > > code
> > > > > base
> > > > > > and Flink, which appears in the TableLineageEntity.
> > > > > >
> > > > > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > > > > >
> > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode
> and a
> > > > > boolean
> > > > > > (the “override” is quite confusing). I’m wondering if these are
> > > > necessary
> > > > > > for meta services, as they are actually concepts defined in the
> > > runtime
> > > > > > level of Flink Table / SQL.
> > > > > >
> > > > > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > > > > `ChangelogMode` and `override` are mainly used for verification
> and
> > > > > > display. For example, Flink currently supports `INSERT`/`DELETE`
> > and
> > > > > > `UPDATE`, we only want to report and update lineage for `INSERT`
> > jobs
> > > > in
> > > > > > our streaming & batch ETL, and display the `override` information
> > on
> > > > the
> > > > > > UI.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> > > > wrote:
> > > > > >
> > > > > > > Hi Shammon,
> > > > > > >
> > > > > > > Thanks for starting this FLIP! Data lineage is a very important
> > > > topic,
> > > > > > > which has been missing for a long time in Flink. I have some
> > > > questions
> > > > > > > about the FLIP.
> > > > > > >
> > > > > > > About events and listeners:
> > > > > > >
> > > > > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > > > > JobCreatedEvent.
> > > > > > > This is an internal class in flink-runtime, and I think the
> > correct
> > > > API
> > > > > > > should be RuntimeExecutionMode. Furthermore, I think the
> boundary
> > > of
> > > > > > batch
> > > > > > > and streaming becomes much more vague as Flink is evolving
> > towards
> > > > > > > batch-streaming unification, so I’m concerned about exposing
> > > JobType
> > > > > as a
> > > > > > > public API. Is there any specific use case to expose the batch
> /
> > > > > > streaming
> > > > > > > info to listeners or meta services?
> > > > > > >
> > > > > > > 2. Currently JobCreatedEvent gives a Configuration, which is
> > quite
> > > > > > > ambiguous. To be honest the configuration is quite a mess in
> > Flink,
> > > > so
> > > > > > > maybe it’s better to be more specific here to tell users what
> > > > > information
> > > > > > > they could expect to see here, instead of just a “job
> > > configuration”
> > > > as
> > > > > > > described in JavaDoc.
> > > > > > >
> > > > > > > 3. JobStatusChangedListenerFactory.Context provides an IO
> > > executor. I
> > > > > > think
> > > > > > > more information should be provided here, such as which thread
> > > model
> > > > > this
> > > > > > > executor could promise, and whether the user should care about
> > > > > > concurrency
> > > > > > > issues. Otherwise I prefer not to give such an utility that no
> > one
> > > > > dares
> > > > > > to
> > > > > > > use safely, and leave it to users to choose their
> implementation.
> > > > > > >
> > > > > > > About lineage:
> > > > > > >
> > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > > list
> > > > of
> > > > > > > LineageEntity. Could you elaborate more on this class? From my
> > > naive
> > > > > > > imagination, the lineage is shaped as a DAG, where vertices are
> > > > sources
> > > > > > and
> > > > > > > sinks (LineageEntity) and edges are connections between them
> > > > > > > (LineageRelation), so it is a bit confusing for a name mixing
> > these
> > > > two
> > > > > > > concepts.
> > > > > > >
> > > > > > > 5. I can’t find the definition of CatalogContext in the current
> > > code
> > > > > base
> > > > > > > and Flink, which appears in the TableLineageEntity.
> > > > > > >
> > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode
> and a
> > > > > boolean
> > > > > > > (the “override” is quite confusing). I’m wondering if these are
> > > > > necessary
> > > > > > > for meta services, as they are actually concepts defined in the
> > > > runtime
> > > > > > > level of Flink Table / SQL.
> > > > > > >
> > > > > > > Best,
> > > > > > > Qingsheng
> > > > > > >
> > > > > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > Is there any comment or feedback for this FLIP? Hope to hear
> > from
> > > > > you,
> > > > > > > > thanks
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon FY
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjureel@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi devs,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on FLIP-314: Support
> > > > Customized
> > > > > > Job
> > > > > > > > > Lineage Listener[1] which is the next stage of FLIP-294
> [2].
> > > > Flink
> > > > > > > > > streaming and batch jobs create lineage dependency between
> > > source
> > > > > and
> > > > > > > > sink,
> > > > > > > > > users can manage their data and jobs according to this
> > lineage
> > > > > > > > information.
> > > > > > > > > For example, when there is a delay in Flink ETL or data,
> > users
> > > > can
> > > > > > > easily
> > > > > > > > > trace the problematic jobs and affected data. On the other
> > > hand,
> > > > > when
> > > > > > > > users
> > > > > > > > > need to correct data or debug, they can perform operations
> > > based
> > > > on
> > > > > > > > lineage
> > > > > > > > > too.
> > > > > > > > >
> > > > > > > > > In FLIP-314 we want to introduce lineage related interfaces
> > for
> > > > > Flink
> > > > > > > and
> > > > > > > > > users can create customized job status listeners. When job
> > > status
> > > > > > > > changes,
> > > > > > > > > users can get job status and information to add, update or
> > > delete
> > > > > > > > lineage.
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback, thanks.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Shammon FY
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Thanks Jing, sounds good to me.

I have updated the FLIP and renamed the lineage related classes to
`LineageGraph`, `LineageVertex` and `LineageEdge` and keep it consistent
with the job definition in Flink.

Best,
Shammon FY

On Thu, Jul 6, 2023 at 8:25 PM Jing Ge <ji...@ververica.com.invalid> wrote:

> Hi Shammon,
>
> Thanks for the clarification. Atlas might have his historical reason back
> to the hadoop era or maybe even back to the hibernate where Entity and
> Relation were commonly used. Flink already used Vertex and Edge to describe
> DAG. Some popular tools like dbt are also using this convention[1] and,
> afaik, most graph frameworks use vertex and edge too. It will be easier for
> Flink devs and users to have a consistent naming convention for the same
> concept, i.e. in this case, DAG.
>
> Best regards,
> Jing
>
> [1]
>
> https://docs.getdbt.com/docs/dbt-cloud-apis/discovery-use-cases-and-examples#discovery
>
> On Wed, Jul 5, 2023 at 11:28 AM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi Jing,
> >
> > Thanks for your feedback.
> >
> > > 1. TableColumnLineageRelation#sinkColumn() should return
> > TableColumnLineageEntity instead of String, right?
> >
> > The `sinkColumn()` will return `String` which is the column name in the
> > sink connector. I found the name of `TableColumnLineageEntity` may
> > cause ambiguity and I have renamed it to
> `TableColumnSourceLineageEntity`.
> > In my mind the `TableColumnLineageRelation` represents the lineage for
> each
> > sink column, each column may be computed from multiple sources and
> columns.
> > I use `TableColumnSourceLineageEntity` to manage each source and its
> > columns for the sink column, so `TableColumnLineageRelation` has a sink
> > column name and `TableColumnSourceLineageEntity` list.
> >
> > > 2. Since LineageRelation already contains all information to build the
> > lineage between sources and sink, do we still need to set the
> LineageEntity
> > in the source?
> >
> > The lineage interface of `DataStream` is very flexible. We have added
> > `setLineageEntity` to the source to limit and verify user behavior,
> > ensuring that users have not added non-existent sources as lineage.
> >
> > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> LineageEdges
> > which contains multiple LineageEdge?
> >
> > We referred to `Atlas` for the name of lineage, it uses `Entity` and
> > `Relation` to represent the lineage relationship and another metadata
> > service `Datahub` uses `DataSet` to represent the entity. I think
> `Entity`
> > and `Relation` are nicer for lineage, what do you think of it?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid>
> > wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for your proposal. After reading the FLIP, I'd like to ask
> > > some questions to make sure we are on the same page. Thanks!
> > >
> > > 1. TableColumnLineageRelation#sinkColumn() should return
> > > TableColumnLineageEntity instead of String, right?
> > >
> > > 2. Since LineageRelation already contains all information to build the
> > > lineage between sources and sink, do we still need to set the
> > LineageEntity
> > > in the source?
> > >
> > > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> > LineageEdges
> > > which contains multiple LineageEdge? E.g. multiple sources join into
> one
> > > sink, or, edges of columns from one or different tables, etc.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > > > Hi yuxia and Yun,
> > > >
> > > > Thanks for your input.
> > > >
> > > > For yuxia:
> > > > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
> > including?
> > > >
> > > > At present, we only need to notify the listener when a job goes to
> > > > termination, but I think it makes sense to add generic `oldStatus`
> and
> > > > `newStatus` in the listener and users can update the job state in
> their
> > > > service as needed.
> > > >
> > > > > 2: I'm really confused about the `config()` included in
> > > `LineageEntity`,
> > > > where is it from and what is it for ?
> > > >
> > > > The `config` in `LineageEntity` is used for users to get options for
> > > source
> > > > and sink connectors. As the examples in the FLIP, users can add
> > > > server/group/topic information in the config for kafka and create
> > lineage
> > > > entities for `DataStream` jobs, then the listeners can get this
> > > information
> > > > to identify the same connector in different jobs. Otherwise, the
> > `config`
> > > > in `TableLineageEntity` will be the same as `getOptions` in
> > > > `CatalogBaseTable`.
> > > >
> > > > > 3: Regardless whether `inputChangelogMode` in
> > `TableSinkLineageEntity`
> > > is
> > > > needed or not, since `TableSinkLineageEntity` contains
> > > > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > > > changelogmode?
> > > >
> > > > At present, we do not actually use the changelog mode. It can be
> > deleted,
> > > > and I have updated FLIP.
> > > >
> > > > > Btw, since there're a lot interfaces proposed, I think it'll be
> > better
> > > to
> > > > give an example about how to implement a listener in this FLIP to
> make
> > us
> > > > know better about the interfaces.
> > > >
> > > > I have added the example in the FLIP and the related interfaces and
> > > > examples are in branch [1].
> > > >
> > > > For Yun:
> > > > > I have one more question on the lookup-join dim tables, it seems
> this
> > > > FLIP does not touch them, and will them become part of the
> > > > List<LineageEntity> sources() or adding another interface?
> > > >
> > > > You're right, currently lookup join dim tables were not considered in
> > the
> > > > 'proposed changed' section of this FLIP. But the interface for
> lineage
> > is
> > > > universal and we can give `TableLookupSourceLineageEntity` which
> > > implements
> > > > `TableSourceLineageEntity` in the future without modifying the public
> > > > interface.
> > > >
> > > > > By the way, if you want to focus on job lineage instead of data
> > column
> > > > lineage in this FLIP, why we must introduce so many column-lineage
> > > related
> > > > interface here?
> > > >
> > > > The lineage information in SQL jobs includes table lineage and column
> > > > lineage. Although SQL jobs currently do not support column lineage,
> we
> > > > would like to support this in the next step. So we have
> comprehensively
> > > > considered the table lineage and column lineage interfaces here, and
> > > > defined these two interfaces together clearly
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
> > > >
> > > > > Hi Shammon,
> > > > >
> > > > > I like the idea in general and it will help to analysis the job
> > > lineages
> > > > > no matter FlinkSQL or Flink jar jobs in production environments.
> > > > >
> > > > > For Qingsheng's concern, I'd like the name of JobType more than
> > > > > RuntimeExecutionMode, as the latter one is not easy to understand
> for
> > > > users.
> > > > >
> > > > > I have one more question on the lookup-join dim tables, it seems
> this
> > > > FLIP
> > > > > does not touch them, and will them become part of the
> > > List<LineageEntity>
> > > > > sources()​ or adding another interface?
> > > > >
> > > > > By the way, if you want to focus on job lineage instead of data
> > column
> > > > > lineage in this FLIP, why we must introduce so many column-lineage
> > > > related
> > > > > interface here?
> > > > >
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > > ________________________________
> > > > > From: Shammon FY <zj...@gmail.com>
> > > > > Sent: Sunday, June 25, 2023 16:13
> > > > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> > > Listener
> > > > >
> > > > > Hi Qingsheng,
> > > > >
> > > > > Thanks for your valuable feedback.
> > > > >
> > > > > > 1. Is there any specific use case to expose the batch / streaming
> > > info
> > > > to
> > > > > listeners or meta services?
> > > > >
> > > > > I agree with you that Flink is evolving towards batch-streaming
> > > > > unification, but the lifecycle of them is different. If a job
> > > processes a
> > > > > bound dataset, it will end after completing the data processing,
> > > > otherwise,
> > > > > it will run for a long time. In our scenario, we will regularly
> > > schedule
> > > > > some Flink jobs to process bound dataset and update some job
> > > information
> > > > to
> > > > > the lineage information for the "batch" jobs such as scheduled
> > > timestamp,
> > > > > execution duration when jobs are finished, which is different from
> > > > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > > > > `existsUnboundedSource` in `StreamingGraph` and
> > > `StreamingGraphGenerator`
> > > > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag,
> > what
> > > > do
> > > > > you think of it?
> > > > >
> > > > > > 2. it’s better to be more specific here to tell users what
> > > information
> > > > > they could expect to see here, instead of just a “job
> configuration”
> > as
> > > > > described in JavaDoc.
> > > > >
> > > > > Thanks and I have updated the doc in FLIP.
> > > > >
> > > > > > 3. About the IO executor in
> > JobStatusChangedListenerFactory.Context.
> > > > >
> > > > > I have updated the docs for io executor  in
> > > > > `JobStatusChangedListenerFactory.Context`, it is a regular thread
> > pool
> > > > and
> > > > > executes submitted tasks in parallel. Users can submit tasks to the
> > > > > executor which ensures that the submitted task can be executed
> before
> > > the
> > > > > job exits.
> > > > >
> > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > list
> > > of
> > > > > LineageEntity.
> > > > >
> > > > > In the initial idea, the `LineageRelationEntity` is used for
> > > `DataStream`
> > > > > to set additional lineage information besides source. For example,
> > > there
> > > > > are table and column lineages in SQL jobs. When we build a
> > `DataStream`
> > > > job
> > > > > with table source and sink, we can add table lineage in the
> following
> > > > > method.
> > > > > ```
> > > > > public class DataStreamSink {
> > > > >     public DataStreamSink setLineageSources(LineageEntity ...
> > sources);
> > > > > }
> > > > > ```
> > > > > But we can not set column lineage for the above sink, and for the
> > sake
> > > of
> > > > > universality, we do not want to add a method similar to
> > > `addLineageColumn
> > > > > (...)` in `DataStreamSink`. So I put this information into
> > > > > LineageRelationEntity so that SQL and DataStream jobs can be
> > > consistent.
> > > > > But as you mentioned, this approach does indeed lead to ambiguity
> and
> > > > > complexity. So my current idea is to add the `setLineageRelation`
> > > method
> > > > in
> > > > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> > > updated
> > > > > the FLIP and please help to review it again, thanks.
> > > > >
> > > > > > 5. I can’t find the definition of CatalogContext in the current
> > code
> > > > base
> > > > > and Flink, which appears in the TableLineageEntity.
> > > > >
> > > > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > > > >
> > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > > > boolean
> > > > > (the “override” is quite confusing). I’m wondering if these are
> > > necessary
> > > > > for meta services, as they are actually concepts defined in the
> > runtime
> > > > > level of Flink Table / SQL.
> > > > >
> > > > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > > > `ChangelogMode` and `override` are mainly used for verification and
> > > > > display. For example, Flink currently supports `INSERT`/`DELETE`
> and
> > > > > `UPDATE`, we only want to report and update lineage for `INSERT`
> jobs
> > > in
> > > > > our streaming & batch ETL, and display the `override` information
> on
> > > the
> > > > > UI.
> > > > >
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Shammon,
> > > > > >
> > > > > > Thanks for starting this FLIP! Data lineage is a very important
> > > topic,
> > > > > > which has been missing for a long time in Flink. I have some
> > > questions
> > > > > > about the FLIP.
> > > > > >
> > > > > > About events and listeners:
> > > > > >
> > > > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > > > JobCreatedEvent.
> > > > > > This is an internal class in flink-runtime, and I think the
> correct
> > > API
> > > > > > should be RuntimeExecutionMode. Furthermore, I think the boundary
> > of
> > > > > batch
> > > > > > and streaming becomes much more vague as Flink is evolving
> towards
> > > > > > batch-streaming unification, so I’m concerned about exposing
> > JobType
> > > > as a
> > > > > > public API. Is there any specific use case to expose the batch /
> > > > > streaming
> > > > > > info to listeners or meta services?
> > > > > >
> > > > > > 2. Currently JobCreatedEvent gives a Configuration, which is
> quite
> > > > > > ambiguous. To be honest the configuration is quite a mess in
> Flink,
> > > so
> > > > > > maybe it’s better to be more specific here to tell users what
> > > > information
> > > > > > they could expect to see here, instead of just a “job
> > configuration”
> > > as
> > > > > > described in JavaDoc.
> > > > > >
> > > > > > 3. JobStatusChangedListenerFactory.Context provides an IO
> > executor. I
> > > > > think
> > > > > > more information should be provided here, such as which thread
> > model
> > > > this
> > > > > > executor could promise, and whether the user should care about
> > > > > concurrency
> > > > > > issues. Otherwise I prefer not to give such an utility that no
> one
> > > > dares
> > > > > to
> > > > > > use safely, and leave it to users to choose their implementation.
> > > > > >
> > > > > > About lineage:
> > > > > >
> > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> > list
> > > of
> > > > > > LineageEntity. Could you elaborate more on this class? From my
> > naive
> > > > > > imagination, the lineage is shaped as a DAG, where vertices are
> > > sources
> > > > > and
> > > > > > sinks (LineageEntity) and edges are connections between them
> > > > > > (LineageRelation), so it is a bit confusing for a name mixing
> these
> > > two
> > > > > > concepts.
> > > > > >
> > > > > > 5. I can’t find the definition of CatalogContext in the current
> > code
> > > > base
> > > > > > and Flink, which appears in the TableLineageEntity.
> > > > > >
> > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > > > boolean
> > > > > > (the “override” is quite confusing). I’m wondering if these are
> > > > necessary
> > > > > > for meta services, as they are actually concepts defined in the
> > > runtime
> > > > > > level of Flink Table / SQL.
> > > > > >
> > > > > > Best,
> > > > > > Qingsheng
> > > > > >
> > > > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > Is there any comment or feedback for this FLIP? Hope to hear
> from
> > > > you,
> > > > > > > thanks
> > > > > > >
> > > > > > > Best,
> > > > > > > Shammon FY
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > I would like to start a discussion on FLIP-314: Support
> > > Customized
> > > > > Job
> > > > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
> > > Flink
> > > > > > > > streaming and batch jobs create lineage dependency between
> > source
> > > > and
> > > > > > > sink,
> > > > > > > > users can manage their data and jobs according to this
> lineage
> > > > > > > information.
> > > > > > > > For example, when there is a delay in Flink ETL or data,
> users
> > > can
> > > > > > easily
> > > > > > > > trace the problematic jobs and affected data. On the other
> > hand,
> > > > when
> > > > > > > users
> > > > > > > > need to correct data or debug, they can perform operations
> > based
> > > on
> > > > > > > lineage
> > > > > > > > too.
> > > > > > > >
> > > > > > > > In FLIP-314 we want to introduce lineage related interfaces
> for
> > > > Flink
> > > > > > and
> > > > > > > > users can create customized job status listeners. When job
> > status
> > > > > > > changes,
> > > > > > > > users can get job status and information to add, update or
> > delete
> > > > > > > lineage.
> > > > > > > >
> > > > > > > > Looking forward to your feedback, thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > > > [2]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon FY
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Shammon,

Thanks for the clarification. Atlas might have his historical reason back
to the hadoop era or maybe even back to the hibernate where Entity and
Relation were commonly used. Flink already used Vertex and Edge to describe
DAG. Some popular tools like dbt are also using this convention[1] and,
afaik, most graph frameworks use vertex and edge too. It will be easier for
Flink devs and users to have a consistent naming convention for the same
concept, i.e. in this case, DAG.

Best regards,
Jing

[1]
https://docs.getdbt.com/docs/dbt-cloud-apis/discovery-use-cases-and-examples#discovery

On Wed, Jul 5, 2023 at 11:28 AM Shammon FY <zj...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for your feedback.
>
> > 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> The `sinkColumn()` will return `String` which is the column name in the
> sink connector. I found the name of `TableColumnLineageEntity` may
> cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
> In my mind the `TableColumnLineageRelation` represents the lineage for each
> sink column, each column may be computed from multiple sources and columns.
> I use `TableColumnSourceLineageEntity` to manage each source and its
> columns for the sink column, so `TableColumnLineageRelation` has a sink
> column name and `TableColumnSourceLineageEntity` list.
>
> > 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> The lineage interface of `DataStream` is very flexible. We have added
> `setLineageEntity` to the source to limit and verify user behavior,
> ensuring that users have not added non-existent sources as lineage.
>
> > 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge?
>
> We referred to `Atlas` for the name of lineage, it uses `Entity` and
> `Relation` to represent the lineage relationship and another metadata
> service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
> and `Relation` are nicer for lineage, what do you think of it?
>
> Best,
> Shammon FY
>
>
> On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid>
> wrote:
>
> > Hi Shammon,
> >
> > Thanks for your proposal. After reading the FLIP, I'd like to ask
> > some questions to make sure we are on the same page. Thanks!
> >
> > 1. TableColumnLineageRelation#sinkColumn() should return
> > TableColumnLineageEntity instead of String, right?
> >
> > 2. Since LineageRelation already contains all information to build the
> > lineage between sources and sink, do we still need to set the
> LineageEntity
> > in the source?
> >
> > 3. About the "Entity" and "Relation" naming, I was confused too, like
> > Qingsheng mentioned. How about LineageVertex, LineageEdge, and
> LineageEdges
> > which contains multiple LineageEdge? E.g. multiple sources join into one
> > sink, or, edges of columns from one or different tables, etc.
> >
> > Best regards,
> > Jing
> >
> > On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi yuxia and Yun,
> > >
> > > Thanks for your input.
> > >
> > > For yuxia:
> > > > 1: What kinds of JobStatus will the `JobExecutionStatusEven`
> including?
> > >
> > > At present, we only need to notify the listener when a job goes to
> > > termination, but I think it makes sense to add generic `oldStatus` and
> > > `newStatus` in the listener and users can update the job state in their
> > > service as needed.
> > >
> > > > 2: I'm really confused about the `config()` included in
> > `LineageEntity`,
> > > where is it from and what is it for ?
> > >
> > > The `config` in `LineageEntity` is used for users to get options for
> > source
> > > and sink connectors. As the examples in the FLIP, users can add
> > > server/group/topic information in the config for kafka and create
> lineage
> > > entities for `DataStream` jobs, then the listeners can get this
> > information
> > > to identify the same connector in different jobs. Otherwise, the
> `config`
> > > in `TableLineageEntity` will be the same as `getOptions` in
> > > `CatalogBaseTable`.
> > >
> > > > 3: Regardless whether `inputChangelogMode` in
> `TableSinkLineageEntity`
> > is
> > > needed or not, since `TableSinkLineageEntity` contains
> > > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > > changelogmode?
> > >
> > > At present, we do not actually use the changelog mode. It can be
> deleted,
> > > and I have updated FLIP.
> > >
> > > > Btw, since there're a lot interfaces proposed, I think it'll be
> better
> > to
> > > give an example about how to implement a listener in this FLIP to make
> us
> > > know better about the interfaces.
> > >
> > > I have added the example in the FLIP and the related interfaces and
> > > examples are in branch [1].
> > >
> > > For Yun:
> > > > I have one more question on the lookup-join dim tables, it seems this
> > > FLIP does not touch them, and will them become part of the
> > > List<LineageEntity> sources() or adding another interface?
> > >
> > > You're right, currently lookup join dim tables were not considered in
> the
> > > 'proposed changed' section of this FLIP. But the interface for lineage
> is
> > > universal and we can give `TableLookupSourceLineageEntity` which
> > implements
> > > `TableSourceLineageEntity` in the future without modifying the public
> > > interface.
> > >
> > > > By the way, if you want to focus on job lineage instead of data
> column
> > > lineage in this FLIP, why we must introduce so many column-lineage
> > related
> > > interface here?
> > >
> > > The lineage information in SQL jobs includes table lineage and column
> > > lineage. Although SQL jobs currently do not support column lineage, we
> > > would like to support this in the next step. So we have comprehensively
> > > considered the table lineage and column lineage interfaces here, and
> > > defined these two interfaces together clearly
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > I like the idea in general and it will help to analysis the job
> > lineages
> > > > no matter FlinkSQL or Flink jar jobs in production environments.
> > > >
> > > > For Qingsheng's concern, I'd like the name of JobType more than
> > > > RuntimeExecutionMode, as the latter one is not easy to understand for
> > > users.
> > > >
> > > > I have one more question on the lookup-join dim tables, it seems this
> > > FLIP
> > > > does not touch them, and will them become part of the
> > List<LineageEntity>
> > > > sources()​ or adding another interface?
> > > >
> > > > By the way, if you want to focus on job lineage instead of data
> column
> > > > lineage in this FLIP, why we must introduce so many column-lineage
> > > related
> > > > interface here?
> > > >
> > > >
> > > > Best
> > > > Yun Tang
> > > > ________________________________
> > > > From: Shammon FY <zj...@gmail.com>
> > > > Sent: Sunday, June 25, 2023 16:13
> > > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> > Listener
> > > >
> > > > Hi Qingsheng,
> > > >
> > > > Thanks for your valuable feedback.
> > > >
> > > > > 1. Is there any specific use case to expose the batch / streaming
> > info
> > > to
> > > > listeners or meta services?
> > > >
> > > > I agree with you that Flink is evolving towards batch-streaming
> > > > unification, but the lifecycle of them is different. If a job
> > processes a
> > > > bound dataset, it will end after completing the data processing,
> > > otherwise,
> > > > it will run for a long time. In our scenario, we will regularly
> > schedule
> > > > some Flink jobs to process bound dataset and update some job
> > information
> > > to
> > > > the lineage information for the "batch" jobs such as scheduled
> > timestamp,
> > > > execution duration when jobs are finished, which is different from
> > > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > > > `existsUnboundedSource` in `StreamingGraph` and
> > `StreamingGraphGenerator`
> > > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag,
> what
> > > do
> > > > you think of it?
> > > >
> > > > > 2. it’s better to be more specific here to tell users what
> > information
> > > > they could expect to see here, instead of just a “job configuration”
> as
> > > > described in JavaDoc.
> > > >
> > > > Thanks and I have updated the doc in FLIP.
> > > >
> > > > > 3. About the IO executor in
> JobStatusChangedListenerFactory.Context.
> > > >
> > > > I have updated the docs for io executor  in
> > > > `JobStatusChangedListenerFactory.Context`, it is a regular thread
> pool
> > > and
> > > > executes submitted tasks in parallel. Users can submit tasks to the
> > > > executor which ensures that the submitted task can be executed before
> > the
> > > > job exits.
> > > >
> > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> list
> > of
> > > > LineageEntity.
> > > >
> > > > In the initial idea, the `LineageRelationEntity` is used for
> > `DataStream`
> > > > to set additional lineage information besides source. For example,
> > there
> > > > are table and column lineages in SQL jobs. When we build a
> `DataStream`
> > > job
> > > > with table source and sink, we can add table lineage in the following
> > > > method.
> > > > ```
> > > > public class DataStreamSink {
> > > >     public DataStreamSink setLineageSources(LineageEntity ...
> sources);
> > > > }
> > > > ```
> > > > But we can not set column lineage for the above sink, and for the
> sake
> > of
> > > > universality, we do not want to add a method similar to
> > `addLineageColumn
> > > > (...)` in `DataStreamSink`. So I put this information into
> > > > LineageRelationEntity so that SQL and DataStream jobs can be
> > consistent.
> > > > But as you mentioned, this approach does indeed lead to ambiguity and
> > > > complexity. So my current idea is to add the `setLineageRelation`
> > method
> > > in
> > > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> > updated
> > > > the FLIP and please help to review it again, thanks.
> > > >
> > > > > 5. I can’t find the definition of CatalogContext in the current
> code
> > > base
> > > > and Flink, which appears in the TableLineageEntity.
> > > >
> > > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > > >
> > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > > boolean
> > > > (the “override” is quite confusing). I’m wondering if these are
> > necessary
> > > > for meta services, as they are actually concepts defined in the
> runtime
> > > > level of Flink Table / SQL.
> > > >
> > > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > > `ChangelogMode` and `override` are mainly used for verification and
> > > > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
> > in
> > > > our streaming & batch ETL, and display the `override` information on
> > the
> > > > UI.
> > > >
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> > wrote:
> > > >
> > > > > Hi Shammon,
> > > > >
> > > > > Thanks for starting this FLIP! Data lineage is a very important
> > topic,
> > > > > which has been missing for a long time in Flink. I have some
> > questions
> > > > > about the FLIP.
> > > > >
> > > > > About events and listeners:
> > > > >
> > > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > > JobCreatedEvent.
> > > > > This is an internal class in flink-runtime, and I think the correct
> > API
> > > > > should be RuntimeExecutionMode. Furthermore, I think the boundary
> of
> > > > batch
> > > > > and streaming becomes much more vague as Flink is evolving towards
> > > > > batch-streaming unification, so I’m concerned about exposing
> JobType
> > > as a
> > > > > public API. Is there any specific use case to expose the batch /
> > > > streaming
> > > > > info to listeners or meta services?
> > > > >
> > > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > > > ambiguous. To be honest the configuration is quite a mess in Flink,
> > so
> > > > > maybe it’s better to be more specific here to tell users what
> > > information
> > > > > they could expect to see here, instead of just a “job
> configuration”
> > as
> > > > > described in JavaDoc.
> > > > >
> > > > > 3. JobStatusChangedListenerFactory.Context provides an IO
> executor. I
> > > > think
> > > > > more information should be provided here, such as which thread
> model
> > > this
> > > > > executor could promise, and whether the user should care about
> > > > concurrency
> > > > > issues. Otherwise I prefer not to give such an utility that no one
> > > dares
> > > > to
> > > > > use safely, and leave it to users to choose their implementation.
> > > > >
> > > > > About lineage:
> > > > >
> > > > > 4. I don’t quite get the LineageRelationEntity, which is just a
> list
> > of
> > > > > LineageEntity. Could you elaborate more on this class? From my
> naive
> > > > > imagination, the lineage is shaped as a DAG, where vertices are
> > sources
> > > > and
> > > > > sinks (LineageEntity) and edges are connections between them
> > > > > (LineageRelation), so it is a bit confusing for a name mixing these
> > two
> > > > > concepts.
> > > > >
> > > > > 5. I can’t find the definition of CatalogContext in the current
> code
> > > base
> > > > > and Flink, which appears in the TableLineageEntity.
> > > > >
> > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > > boolean
> > > > > (the “override” is quite confusing). I’m wondering if these are
> > > necessary
> > > > > for meta services, as they are actually concepts defined in the
> > runtime
> > > > > level of Flink Table / SQL.
> > > > >
> > > > > Best,
> > > > > Qingsheng
> > > > >
> > > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > Is there any comment or feedback for this FLIP? Hope to hear from
> > > you,
> > > > > > thanks
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > I would like to start a discussion on FLIP-314: Support
> > Customized
> > > > Job
> > > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
> > Flink
> > > > > > > streaming and batch jobs create lineage dependency between
> source
> > > and
> > > > > > sink,
> > > > > > > users can manage their data and jobs according to this lineage
> > > > > > information.
> > > > > > > For example, when there is a delay in Flink ETL or data, users
> > can
> > > > > easily
> > > > > > > trace the problematic jobs and affected data. On the other
> hand,
> > > when
> > > > > > users
> > > > > > > need to correct data or debug, they can perform operations
> based
> > on
> > > > > > lineage
> > > > > > > too.
> > > > > > >
> > > > > > > In FLIP-314 we want to introduce lineage related interfaces for
> > > Flink
> > > > > and
> > > > > > > users can create customized job status listeners. When job
> status
> > > > > > changes,
> > > > > > > users can get job status and information to add, update or
> delete
> > > > > > lineage.
> > > > > > >
> > > > > > > Looking forward to your feedback, thanks.
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > > [2]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > > >
> > > > > > > Best,
> > > > > > > Shammon FY
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi Jing,

Thanks for your feedback.

> 1. TableColumnLineageRelation#sinkColumn() should return
TableColumnLineageEntity instead of String, right?

The `sinkColumn()` will return `String` which is the column name in the
sink connector. I found the name of `TableColumnLineageEntity` may
cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`.
In my mind the `TableColumnLineageRelation` represents the lineage for each
sink column, each column may be computed from multiple sources and columns.
I use `TableColumnSourceLineageEntity` to manage each source and its
columns for the sink column, so `TableColumnLineageRelation` has a sink
column name and `TableColumnSourceLineageEntity` list.

> 2. Since LineageRelation already contains all information to build the
lineage between sources and sink, do we still need to set the LineageEntity
in the source?

The lineage interface of `DataStream` is very flexible. We have added
`setLineageEntity` to the source to limit and verify user behavior,
ensuring that users have not added non-existent sources as lineage.

> 3. About the "Entity" and "Relation" naming, I was confused too, like
Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
which contains multiple LineageEdge?

We referred to `Atlas` for the name of lineage, it uses `Entity` and
`Relation` to represent the lineage relationship and another metadata
service `Datahub` uses `DataSet` to represent the entity. I think `Entity`
and `Relation` are nicer for lineage, what do you think of it?

Best,
Shammon FY


On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <ji...@ververica.com.invalid> wrote:

> Hi Shammon,
>
> Thanks for your proposal. After reading the FLIP, I'd like to ask
> some questions to make sure we are on the same page. Thanks!
>
> 1. TableColumnLineageRelation#sinkColumn() should return
> TableColumnLineageEntity instead of String, right?
>
> 2. Since LineageRelation already contains all information to build the
> lineage between sources and sink, do we still need to set the LineageEntity
> in the source?
>
> 3. About the "Entity" and "Relation" naming, I was confused too, like
> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
> which contains multiple LineageEdge? E.g. multiple sources join into one
> sink, or, edges of columns from one or different tables, etc.
>
> Best regards,
> Jing
>
> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi yuxia and Yun,
> >
> > Thanks for your input.
> >
> > For yuxia:
> > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
> >
> > At present, we only need to notify the listener when a job goes to
> > termination, but I think it makes sense to add generic `oldStatus` and
> > `newStatus` in the listener and users can update the job state in their
> > service as needed.
> >
> > > 2: I'm really confused about the `config()` included in
> `LineageEntity`,
> > where is it from and what is it for ?
> >
> > The `config` in `LineageEntity` is used for users to get options for
> source
> > and sink connectors. As the examples in the FLIP, users can add
> > server/group/topic information in the config for kafka and create lineage
> > entities for `DataStream` jobs, then the listeners can get this
> information
> > to identify the same connector in different jobs. Otherwise, the `config`
> > in `TableLineageEntity` will be the same as `getOptions` in
> > `CatalogBaseTable`.
> >
> > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity`
> is
> > needed or not, since `TableSinkLineageEntity` contains
> > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> > changelogmode?
> >
> > At present, we do not actually use the changelog mode. It can be deleted,
> > and I have updated FLIP.
> >
> > > Btw, since there're a lot interfaces proposed, I think it'll be better
> to
> > give an example about how to implement a listener in this FLIP to make us
> > know better about the interfaces.
> >
> > I have added the example in the FLIP and the related interfaces and
> > examples are in branch [1].
> >
> > For Yun:
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP does not touch them, and will them become part of the
> > List<LineageEntity> sources() or adding another interface?
> >
> > You're right, currently lookup join dim tables were not considered in the
> > 'proposed changed' section of this FLIP. But the interface for lineage is
> > universal and we can give `TableLookupSourceLineageEntity` which
> implements
> > `TableSourceLineageEntity` in the future without modifying the public
> > interface.
> >
> > > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> > The lineage information in SQL jobs includes table lineage and column
> > lineage. Although SQL jobs currently do not support column lineage, we
> > would like to support this in the next step. So we have comprehensively
> > considered the table lineage and column lineage interfaces here, and
> > defined these two interfaces together clearly
> >
> >
> > [1]
> >
> >
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
> >
> > Best,
> > Shammon FY
> >
> >
> > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
> >
> > > Hi Shammon,
> > >
> > > I like the idea in general and it will help to analysis the job
> lineages
> > > no matter FlinkSQL or Flink jar jobs in production environments.
> > >
> > > For Qingsheng's concern, I'd like the name of JobType more than
> > > RuntimeExecutionMode, as the latter one is not easy to understand for
> > users.
> > >
> > > I have one more question on the lookup-join dim tables, it seems this
> > FLIP
> > > does not touch them, and will them become part of the
> List<LineageEntity>
> > > sources()​ or adding another interface?
> > >
> > > By the way, if you want to focus on job lineage instead of data column
> > > lineage in this FLIP, why we must introduce so many column-lineage
> > related
> > > interface here?
> > >
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Shammon FY <zj...@gmail.com>
> > > Sent: Sunday, June 25, 2023 16:13
> > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage
> Listener
> > >
> > > Hi Qingsheng,
> > >
> > > Thanks for your valuable feedback.
> > >
> > > > 1. Is there any specific use case to expose the batch / streaming
> info
> > to
> > > listeners or meta services?
> > >
> > > I agree with you that Flink is evolving towards batch-streaming
> > > unification, but the lifecycle of them is different. If a job
> processes a
> > > bound dataset, it will end after completing the data processing,
> > otherwise,
> > > it will run for a long time. In our scenario, we will regularly
> schedule
> > > some Flink jobs to process bound dataset and update some job
> information
> > to
> > > the lineage information for the "batch" jobs such as scheduled
> timestamp,
> > > execution duration when jobs are finished, which is different from
> > > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > > `existsUnboundedSource` in `StreamingGraph` and
> `StreamingGraphGenerator`
> > > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
> > do
> > > you think of it?
> > >
> > > > 2. it’s better to be more specific here to tell users what
> information
> > > they could expect to see here, instead of just a “job configuration” as
> > > described in JavaDoc.
> > >
> > > Thanks and I have updated the doc in FLIP.
> > >
> > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
> > >
> > > I have updated the docs for io executor  in
> > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
> > and
> > > executes submitted tasks in parallel. Users can submit tasks to the
> > > executor which ensures that the submitted task can be executed before
> the
> > > job exits.
> > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > LineageEntity.
> > >
> > > In the initial idea, the `LineageRelationEntity` is used for
> `DataStream`
> > > to set additional lineage information besides source. For example,
> there
> > > are table and column lineages in SQL jobs. When we build a `DataStream`
> > job
> > > with table source and sink, we can add table lineage in the following
> > > method.
> > > ```
> > > public class DataStreamSink {
> > >     public DataStreamSink setLineageSources(LineageEntity ... sources);
> > > }
> > > ```
> > > But we can not set column lineage for the above sink, and for the sake
> of
> > > universality, we do not want to add a method similar to
> `addLineageColumn
> > > (...)` in `DataStreamSink`. So I put this information into
> > > LineageRelationEntity so that SQL and DataStream jobs can be
> consistent.
> > > But as you mentioned, this approach does indeed lead to ambiguity and
> > > complexity. So my current idea is to add the `setLineageRelation`
> method
> > in
> > > `DataStreamSink` directly without `LineageRelationEntity`, I have
> updated
> > > the FLIP and please help to review it again, thanks.
> > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > and Flink, which appears in the TableLineageEntity.
> > >
> > > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > (the “override” is quite confusing). I’m wondering if these are
> necessary
> > > for meta services, as they are actually concepts defined in the runtime
> > > level of Flink Table / SQL.
> > >
> > > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > > `ChangelogMode` and `override` are mainly used for verification and
> > > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs
> in
> > > our streaming & batch ETL, and display the `override` information on
> the
> > > UI.
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org>
> wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for starting this FLIP! Data lineage is a very important
> topic,
> > > > which has been missing for a long time in Flink. I have some
> questions
> > > > about the FLIP.
> > > >
> > > > About events and listeners:
> > > >
> > > > 1. I’m not sure if it is necessary to expose JobType to in
> > > JobCreatedEvent.
> > > > This is an internal class in flink-runtime, and I think the correct
> API
> > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> > > batch
> > > > and streaming becomes much more vague as Flink is evolving towards
> > > > batch-streaming unification, so I’m concerned about exposing JobType
> > as a
> > > > public API. Is there any specific use case to expose the batch /
> > > streaming
> > > > info to listeners or meta services?
> > > >
> > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > > ambiguous. To be honest the configuration is quite a mess in Flink,
> so
> > > > maybe it’s better to be more specific here to tell users what
> > information
> > > > they could expect to see here, instead of just a “job configuration”
> as
> > > > described in JavaDoc.
> > > >
> > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
> > > think
> > > > more information should be provided here, such as which thread model
> > this
> > > > executor could promise, and whether the user should care about
> > > concurrency
> > > > issues. Otherwise I prefer not to give such an utility that no one
> > dares
> > > to
> > > > use safely, and leave it to users to choose their implementation.
> > > >
> > > > About lineage:
> > > >
> > > > 4. I don’t quite get the LineageRelationEntity, which is just a list
> of
> > > > LineageEntity. Could you elaborate more on this class? From my naive
> > > > imagination, the lineage is shaped as a DAG, where vertices are
> sources
> > > and
> > > > sinks (LineageEntity) and edges are connections between them
> > > > (LineageRelation), so it is a bit confusing for a name mixing these
> two
> > > > concepts.
> > > >
> > > > 5. I can’t find the definition of CatalogContext in the current code
> > base
> > > > and Flink, which appears in the TableLineageEntity.
> > > >
> > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> > boolean
> > > > (the “override” is quite confusing). I’m wondering if these are
> > necessary
> > > > for meta services, as they are actually concepts defined in the
> runtime
> > > > level of Flink Table / SQL.
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > Is there any comment or feedback for this FLIP? Hope to hear from
> > you,
> > > > > thanks
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I would like to start a discussion on FLIP-314: Support
> Customized
> > > Job
> > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2].
> Flink
> > > > > > streaming and batch jobs create lineage dependency between source
> > and
> > > > > sink,
> > > > > > users can manage their data and jobs according to this lineage
> > > > > information.
> > > > > > For example, when there is a delay in Flink ETL or data, users
> can
> > > > easily
> > > > > > trace the problematic jobs and affected data. On the other hand,
> > when
> > > > > users
> > > > > > need to correct data or debug, they can perform operations based
> on
> > > > > lineage
> > > > > > too.
> > > > > >
> > > > > > In FLIP-314 we want to introduce lineage related interfaces for
> > Flink
> > > > and
> > > > > > users can create customized job status listeners. When job status
> > > > > changes,
> > > > > > users can get job status and information to add, update or delete
> > > > > lineage.
> > > > > >
> > > > > > Looking forward to your feedback, thanks.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > > [2]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Shammon,

Thanks for your proposal. After reading the FLIP, I'd like to ask
some questions to make sure we are on the same page. Thanks!

1. TableColumnLineageRelation#sinkColumn() should return
TableColumnLineageEntity instead of String, right?

2. Since LineageRelation already contains all information to build the
lineage between sources and sink, do we still need to set the LineageEntity
in the source?

3. About the "Entity" and "Relation" naming, I was confused too, like
Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges
which contains multiple LineageEdge? E.g. multiple sources join into one
sink, or, edges of columns from one or different tables, etc.

Best regards,
Jing

On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zj...@gmail.com> wrote:

> Hi yuxia and Yun,
>
> Thanks for your input.
>
> For yuxia:
> > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?
>
> At present, we only need to notify the listener when a job goes to
> termination, but I think it makes sense to add generic `oldStatus` and
> `newStatus` in the listener and users can update the job state in their
> service as needed.
>
> > 2: I'm really confused about the `config()` included in `LineageEntity`,
> where is it from and what is it for ?
>
> The `config` in `LineageEntity` is used for users to get options for source
> and sink connectors. As the examples in the FLIP, users can add
> server/group/topic information in the config for kafka and create lineage
> entities for `DataStream` jobs, then the listeners can get this information
> to identify the same connector in different jobs. Otherwise, the `config`
> in `TableLineageEntity` will be the same as `getOptions` in
> `CatalogBaseTable`.
>
> > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` is
> needed or not, since `TableSinkLineageEntity` contains
> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain
> changelogmode?
>
> At present, we do not actually use the changelog mode. It can be deleted,
> and I have updated FLIP.
>
> > Btw, since there're a lot interfaces proposed, I think it'll be better to
> give an example about how to implement a listener in this FLIP to make us
> know better about the interfaces.
>
> I have added the example in the FLIP and the related interfaces and
> examples are in branch [1].
>
> For Yun:
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP does not touch them, and will them become part of the
> List<LineageEntity> sources() or adding another interface?
>
> You're right, currently lookup join dim tables were not considered in the
> 'proposed changed' section of this FLIP. But the interface for lineage is
> universal and we can give `TableLookupSourceLineageEntity` which implements
> `TableSourceLineageEntity` in the future without modifying the public
> interface.
>
> > By the way, if you want to focus on job lineage instead of data column
> lineage in this FLIP, why we must introduce so many column-lineage related
> interface here?
>
> The lineage information in SQL jobs includes table lineage and column
> lineage. Although SQL jobs currently do not support column lineage, we
> would like to support this in the next step. So we have comprehensively
> considered the table lineage and column lineage interfaces here, and
> defined these two interfaces together clearly
>
>
> [1]
>
> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c
>
> Best,
> Shammon FY
>
>
> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:
>
> > Hi Shammon,
> >
> > I like the idea in general and it will help to analysis the job lineages
> > no matter FlinkSQL or Flink jar jobs in production environments.
> >
> > For Qingsheng's concern, I'd like the name of JobType more than
> > RuntimeExecutionMode, as the latter one is not easy to understand for
> users.
> >
> > I have one more question on the lookup-join dim tables, it seems this
> FLIP
> > does not touch them, and will them become part of the List<LineageEntity>
> > sources()​ or adding another interface?
> >
> > By the way, if you want to focus on job lineage instead of data column
> > lineage in this FLIP, why we must introduce so many column-lineage
> related
> > interface here?
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Shammon FY <zj...@gmail.com>
> > Sent: Sunday, June 25, 2023 16:13
> > To: dev@flink.apache.org <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
> >
> > Hi Qingsheng,
> >
> > Thanks for your valuable feedback.
> >
> > > 1. Is there any specific use case to expose the batch / streaming info
> to
> > listeners or meta services?
> >
> > I agree with you that Flink is evolving towards batch-streaming
> > unification, but the lifecycle of them is different. If a job processes a
> > bound dataset, it will end after completing the data processing,
> otherwise,
> > it will run for a long time. In our scenario, we will regularly schedule
> > some Flink jobs to process bound dataset and update some job information
> to
> > the lineage information for the "batch" jobs such as scheduled timestamp,
> > execution duration when jobs are finished, which is different from
> > "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> > `existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
> > to determine `JobType` and disjoin jobs. We can mark `JobType` as
> > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what
> do
> > you think of it?
> >
> > > 2. it’s better to be more specific here to tell users what information
> > they could expect to see here, instead of just a “job configuration” as
> > described in JavaDoc.
> >
> > Thanks and I have updated the doc in FLIP.
> >
> > > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
> >
> > I have updated the docs for io executor  in
> > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool
> and
> > executes submitted tasks in parallel. Users can submit tasks to the
> > executor which ensures that the submitted task can be executed before the
> > job exits.
> >
> > > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> > LineageEntity.
> >
> > In the initial idea, the `LineageRelationEntity` is used for `DataStream`
> > to set additional lineage information besides source. For example, there
> > are table and column lineages in SQL jobs. When we build a `DataStream`
> job
> > with table source and sink, we can add table lineage in the following
> > method.
> > ```
> > public class DataStreamSink {
> >     public DataStreamSink setLineageSources(LineageEntity ... sources);
> > }
> > ```
> > But we can not set column lineage for the above sink, and for the sake of
> > universality, we do not want to add a method similar to `addLineageColumn
> > (...)` in `DataStreamSink`. So I put this information into
> > LineageRelationEntity so that SQL and DataStream jobs can be consistent.
> > But as you mentioned, this approach does indeed lead to ambiguity and
> > complexity. So my current idea is to add the `setLineageRelation` method
> in
> > `DataStreamSink` directly without `LineageRelationEntity`, I have updated
> > the FLIP and please help to review it again, thanks.
> >
> > > 5. I can’t find the definition of CatalogContext in the current code
> base
> > and Flink, which appears in the TableLineageEntity.
> >
> > CatalogContext is defined in FLIP-294 and I have updated the FLIP
> >
> > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> boolean
> > (the “override” is quite confusing). I’m wondering if these are necessary
> > for meta services, as they are actually concepts defined in the runtime
> > level of Flink Table / SQL.
> >
> > The information in `TableSinkLineageEntity` such as `ModifyType`,
> > `ChangelogMode` and `override` are mainly used for verification and
> > display. For example, Flink currently supports `INSERT`/`DELETE` and
> > `UPDATE`, we only want to report and update lineage for `INSERT` jobs in
> > our streaming & batch ETL, and display the `override` information on the
> > UI.
> >
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for starting this FLIP! Data lineage is a very important topic,
> > > which has been missing for a long time in Flink. I have some questions
> > > about the FLIP.
> > >
> > > About events and listeners:
> > >
> > > 1. I’m not sure if it is necessary to expose JobType to in
> > JobCreatedEvent.
> > > This is an internal class in flink-runtime, and I think the correct API
> > > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> > batch
> > > and streaming becomes much more vague as Flink is evolving towards
> > > batch-streaming unification, so I’m concerned about exposing JobType
> as a
> > > public API. Is there any specific use case to expose the batch /
> > streaming
> > > info to listeners or meta services?
> > >
> > > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > > ambiguous. To be honest the configuration is quite a mess in Flink, so
> > > maybe it’s better to be more specific here to tell users what
> information
> > > they could expect to see here, instead of just a “job configuration” as
> > > described in JavaDoc.
> > >
> > > 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
> > think
> > > more information should be provided here, such as which thread model
> this
> > > executor could promise, and whether the user should care about
> > concurrency
> > > issues. Otherwise I prefer not to give such an utility that no one
> dares
> > to
> > > use safely, and leave it to users to choose their implementation.
> > >
> > > About lineage:
> > >
> > > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> > > LineageEntity. Could you elaborate more on this class? From my naive
> > > imagination, the lineage is shaped as a DAG, where vertices are sources
> > and
> > > sinks (LineageEntity) and edges are connections between them
> > > (LineageRelation), so it is a bit confusing for a name mixing these two
> > > concepts.
> > >
> > > 5. I can’t find the definition of CatalogContext in the current code
> base
> > > and Flink, which appears in the TableLineageEntity.
> > >
> > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a
> boolean
> > > (the “override” is quite confusing). I’m wondering if these are
> necessary
> > > for meta services, as they are actually concepts defined in the runtime
> > > level of Flink Table / SQL.
> > >
> > > Best,
> > > Qingsheng
> > >
> > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Is there any comment or feedback for this FLIP? Hope to hear from
> you,
> > > > thanks
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I would like to start a discussion on FLIP-314: Support Customized
> > Job
> > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > > > streaming and batch jobs create lineage dependency between source
> and
> > > > sink,
> > > > > users can manage their data and jobs according to this lineage
> > > > information.
> > > > > For example, when there is a delay in Flink ETL or data, users can
> > > easily
> > > > > trace the problematic jobs and affected data. On the other hand,
> when
> > > > users
> > > > > need to correct data or debug, they can perform operations based on
> > > > lineage
> > > > > too.
> > > > >
> > > > > In FLIP-314 we want to introduce lineage related interfaces for
> Flink
> > > and
> > > > > users can create customized job status listeners. When job status
> > > > changes,
> > > > > users can get job status and information to add, update or delete
> > > > lineage.
> > > > >
> > > > > Looking forward to your feedback, thanks.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi yuxia and Yun,

Thanks for your input.

For yuxia:
> 1: What kinds of JobStatus will the `JobExecutionStatusEven` including?

At present, we only need to notify the listener when a job goes to
termination, but I think it makes sense to add generic `oldStatus` and
`newStatus` in the listener and users can update the job state in their
service as needed.

> 2: I'm really confused about the `config()` included in `LineageEntity`,
where is it from and what is it for ?

The `config` in `LineageEntity` is used for users to get options for source
and sink connectors. As the examples in the FLIP, users can add
server/group/topic information in the config for kafka and create lineage
entities for `DataStream` jobs, then the listeners can get this information
to identify the same connector in different jobs. Otherwise, the `config`
in `TableLineageEntity` will be the same as `getOptions` in
`CatalogBaseTable`.

> 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` is
needed or not, since `TableSinkLineageEntity` contains
`inputChangelogMode`, why `TableSourceLineageEntity` don't contain
changelogmode?

At present, we do not actually use the changelog mode. It can be deleted,
and I have updated FLIP.

> Btw, since there're a lot interfaces proposed, I think it'll be better to
give an example about how to implement a listener in this FLIP to make us
know better about the interfaces.

I have added the example in the FLIP and the related interfaces and
examples are in branch [1].

For Yun:
> I have one more question on the lookup-join dim tables, it seems this
FLIP does not touch them, and will them become part of the
List<LineageEntity> sources() or adding another interface?

You're right, currently lookup join dim tables were not considered in the
'proposed changed' section of this FLIP. But the interface for lineage is
universal and we can give `TableLookupSourceLineageEntity` which implements
`TableSourceLineageEntity` in the future without modifying the public
interface.

> By the way, if you want to focus on job lineage instead of data column
lineage in this FLIP, why we must introduce so many column-lineage related
interface here?

The lineage information in SQL jobs includes table lineage and column
lineage. Although SQL jobs currently do not support column lineage, we
would like to support this in the next step. So we have comprehensively
considered the table lineage and column lineage interfaces here, and
defined these two interfaces together clearly


[1]
https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c

Best,
Shammon FY


On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <my...@live.com> wrote:

> Hi Shammon,
>
> I like the idea in general and it will help to analysis the job lineages
> no matter FlinkSQL or Flink jar jobs in production environments.
>
> For Qingsheng's concern, I'd like the name of JobType more than
> RuntimeExecutionMode, as the latter one is not easy to understand for users.
>
> I have one more question on the lookup-join dim tables, it seems this FLIP
> does not touch them, and will them become part of the List<LineageEntity>
> sources()​ or adding another interface?
>
> By the way, if you want to focus on job lineage instead of data column
> lineage in this FLIP, why we must introduce so many column-lineage related
> interface here?
>
>
> Best
> Yun Tang
> ________________________________
> From: Shammon FY <zj...@gmail.com>
> Sent: Sunday, June 25, 2023 16:13
> To: dev@flink.apache.org <de...@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener
>
> Hi Qingsheng,
>
> Thanks for your valuable feedback.
>
> > 1. Is there any specific use case to expose the batch / streaming info to
> listeners or meta services?
>
> I agree with you that Flink is evolving towards batch-streaming
> unification, but the lifecycle of them is different. If a job processes a
> bound dataset, it will end after completing the data processing, otherwise,
> it will run for a long time. In our scenario, we will regularly schedule
> some Flink jobs to process bound dataset and update some job information to
> the lineage information for the "batch" jobs such as scheduled timestamp,
> execution duration when jobs are finished, which is different from
> "streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
> `existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
> to determine `JobType` and disjoin jobs. We can mark `JobType` as
> `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what do
> you think of it?
>
> > 2. it’s better to be more specific here to tell users what information
> they could expect to see here, instead of just a “job configuration” as
> described in JavaDoc.
>
> Thanks and I have updated the doc in FLIP.
>
> > 3. About the IO executor in JobStatusChangedListenerFactory.Context.
>
> I have updated the docs for io executor  in
> `JobStatusChangedListenerFactory.Context`, it is a regular thread pool and
> executes submitted tasks in parallel. Users can submit tasks to the
> executor which ensures that the submitted task can be executed before the
> job exits.
>
> > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> LineageEntity.
>
> In the initial idea, the `LineageRelationEntity` is used for `DataStream`
> to set additional lineage information besides source. For example, there
> are table and column lineages in SQL jobs. When we build a `DataStream` job
> with table source and sink, we can add table lineage in the following
> method.
> ```
> public class DataStreamSink {
>     public DataStreamSink setLineageSources(LineageEntity ... sources);
> }
> ```
> But we can not set column lineage for the above sink, and for the sake of
> universality, we do not want to add a method similar to `addLineageColumn
> (...)` in `DataStreamSink`. So I put this information into
> LineageRelationEntity so that SQL and DataStream jobs can be consistent.
> But as you mentioned, this approach does indeed lead to ambiguity and
> complexity. So my current idea is to add the `setLineageRelation` method in
> `DataStreamSink` directly without `LineageRelationEntity`, I have updated
> the FLIP and please help to review it again, thanks.
>
> > 5. I can’t find the definition of CatalogContext in the current code base
> and Flink, which appears in the TableLineageEntity.
>
> CatalogContext is defined in FLIP-294 and I have updated the FLIP
>
> > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
> (the “override” is quite confusing). I’m wondering if these are necessary
> for meta services, as they are actually concepts defined in the runtime
> level of Flink Table / SQL.
>
> The information in `TableSinkLineageEntity` such as `ModifyType`,
> `ChangelogMode` and `override` are mainly used for verification and
> display. For example, Flink currently supports `INSERT`/`DELETE` and
> `UPDATE`, we only want to report and update lineage for `INSERT` jobs in
> our streaming & batch ETL, and display the `override` information on the
> UI.
>
>
> Best,
> Shammon FY
>
>
> On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:
>
> > Hi Shammon,
> >
> > Thanks for starting this FLIP! Data lineage is a very important topic,
> > which has been missing for a long time in Flink. I have some questions
> > about the FLIP.
> >
> > About events and listeners:
> >
> > 1. I’m not sure if it is necessary to expose JobType to in
> JobCreatedEvent.
> > This is an internal class in flink-runtime, and I think the correct API
> > should be RuntimeExecutionMode. Furthermore, I think the boundary of
> batch
> > and streaming becomes much more vague as Flink is evolving towards
> > batch-streaming unification, so I’m concerned about exposing JobType as a
> > public API. Is there any specific use case to expose the batch /
> streaming
> > info to listeners or meta services?
> >
> > 2. Currently JobCreatedEvent gives a Configuration, which is quite
> > ambiguous. To be honest the configuration is quite a mess in Flink, so
> > maybe it’s better to be more specific here to tell users what information
> > they could expect to see here, instead of just a “job configuration” as
> > described in JavaDoc.
> >
> > 3. JobStatusChangedListenerFactory.Context provides an IO executor. I
> think
> > more information should be provided here, such as which thread model this
> > executor could promise, and whether the user should care about
> concurrency
> > issues. Otherwise I prefer not to give such an utility that no one dares
> to
> > use safely, and leave it to users to choose their implementation.
> >
> > About lineage:
> >
> > 4. I don’t quite get the LineageRelationEntity, which is just a list of
> > LineageEntity. Could you elaborate more on this class? From my naive
> > imagination, the lineage is shaped as a DAG, where vertices are sources
> and
> > sinks (LineageEntity) and edges are connections between them
> > (LineageRelation), so it is a bit confusing for a name mixing these two
> > concepts.
> >
> > 5. I can’t find the definition of CatalogContext in the current code base
> > and Flink, which appears in the TableLineageEntity.
> >
> > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
> > (the “override” is quite confusing). I’m wondering if these are necessary
> > for meta services, as they are actually concepts defined in the runtime
> > level of Flink Table / SQL.
> >
> > Best,
> > Qingsheng
> >
> > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > Is there any comment or feedback for this FLIP? Hope to hear from you,
> > > thanks
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I would like to start a discussion on FLIP-314: Support Customized
> Job
> > > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > > streaming and batch jobs create lineage dependency between source and
> > > sink,
> > > > users can manage their data and jobs according to this lineage
> > > information.
> > > > For example, when there is a delay in Flink ETL or data, users can
> > easily
> > > > trace the problematic jobs and affected data. On the other hand, when
> > > users
> > > > need to correct data or debug, they can perform operations based on
> > > lineage
> > > > too.
> > > >
> > > > In FLIP-314 we want to introduce lineage related interfaces for Flink
> > and
> > > > users can create customized job status listeners. When job status
> > > changes,
> > > > users can get job status and information to add, update or delete
> > > lineage.
> > > >
> > > > Looking forward to your feedback, thanks.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Yun Tang <my...@live.com>.
Hi Shammon,

I like the idea in general and it will help to analysis the job lineages no matter FlinkSQL or Flink jar jobs in production environments.

For Qingsheng's concern, I'd like the name of JobType more than RuntimeExecutionMode, as the latter one is not easy to understand for users.

I have one more question on the lookup-join dim tables, it seems this FLIP does not touch them, and will them become part of the List<LineageEntity> sources()​ or adding another interface?

By the way, if you want to focus on job lineage instead of data column lineage in this FLIP, why we must introduce so many column-lineage related interface here?


Best
Yun Tang
________________________________
From: Shammon FY <zj...@gmail.com>
Sent: Sunday, June 25, 2023 16:13
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Hi Qingsheng,

Thanks for your valuable feedback.

> 1. Is there any specific use case to expose the batch / streaming info to
listeners or meta services?

I agree with you that Flink is evolving towards batch-streaming
unification, but the lifecycle of them is different. If a job processes a
bound dataset, it will end after completing the data processing, otherwise,
it will run for a long time. In our scenario, we will regularly schedule
some Flink jobs to process bound dataset and update some job information to
the lineage information for the "batch" jobs such as scheduled timestamp,
execution duration when jobs are finished, which is different from
"streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
`existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
to determine `JobType` and disjoin jobs. We can mark `JobType` as
`PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what do
you think of it?

> 2. it’s better to be more specific here to tell users what information
they could expect to see here, instead of just a “job configuration” as
described in JavaDoc.

Thanks and I have updated the doc in FLIP.

> 3. About the IO executor in JobStatusChangedListenerFactory.Context.

I have updated the docs for io executor  in
`JobStatusChangedListenerFactory.Context`, it is a regular thread pool and
executes submitted tasks in parallel. Users can submit tasks to the
executor which ensures that the submitted task can be executed before the
job exits.

> 4. I don’t quite get the LineageRelationEntity, which is just a list of
LineageEntity.

In the initial idea, the `LineageRelationEntity` is used for `DataStream`
to set additional lineage information besides source. For example, there
are table and column lineages in SQL jobs. When we build a `DataStream` job
with table source and sink, we can add table lineage in the following
method.
```
public class DataStreamSink {
    public DataStreamSink setLineageSources(LineageEntity ... sources);
}
```
But we can not set column lineage for the above sink, and for the sake of
universality, we do not want to add a method similar to `addLineageColumn
(...)` in `DataStreamSink`. So I put this information into
LineageRelationEntity so that SQL and DataStream jobs can be consistent.
But as you mentioned, this approach does indeed lead to ambiguity and
complexity. So my current idea is to add the `setLineageRelation` method in
`DataStreamSink` directly without `LineageRelationEntity`, I have updated
the FLIP and please help to review it again, thanks.

> 5. I can’t find the definition of CatalogContext in the current code base
and Flink, which appears in the TableLineageEntity.

CatalogContext is defined in FLIP-294 and I have updated the FLIP

> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
(the “override” is quite confusing). I’m wondering if these are necessary
for meta services, as they are actually concepts defined in the runtime
level of Flink Table / SQL.

The information in `TableSinkLineageEntity` such as `ModifyType`,
`ChangelogMode` and `override` are mainly used for verification and
display. For example, Flink currently supports `INSERT`/`DELETE` and
`UPDATE`, we only want to report and update lineage for `INSERT` jobs in
our streaming & batch ETL, and display the `override` information on the UI.


Best,
Shammon FY


On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:

> Hi Shammon,
>
> Thanks for starting this FLIP! Data lineage is a very important topic,
> which has been missing for a long time in Flink. I have some questions
> about the FLIP.
>
> About events and listeners:
>
> 1. I’m not sure if it is necessary to expose JobType to in JobCreatedEvent.
> This is an internal class in flink-runtime, and I think the correct API
> should be RuntimeExecutionMode. Furthermore, I think the boundary of batch
> and streaming becomes much more vague as Flink is evolving towards
> batch-streaming unification, so I’m concerned about exposing JobType as a
> public API. Is there any specific use case to expose the batch / streaming
> info to listeners or meta services?
>
> 2. Currently JobCreatedEvent gives a Configuration, which is quite
> ambiguous. To be honest the configuration is quite a mess in Flink, so
> maybe it’s better to be more specific here to tell users what information
> they could expect to see here, instead of just a “job configuration” as
> described in JavaDoc.
>
> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I think
> more information should be provided here, such as which thread model this
> executor could promise, and whether the user should care about concurrency
> issues. Otherwise I prefer not to give such an utility that no one dares to
> use safely, and leave it to users to choose their implementation.
>
> About lineage:
>
> 4. I don’t quite get the LineageRelationEntity, which is just a list of
> LineageEntity. Could you elaborate more on this class? From my naive
> imagination, the lineage is shaped as a DAG, where vertices are sources and
> sinks (LineageEntity) and edges are connections between them
> (LineageRelation), so it is a bit confusing for a name mixing these two
> concepts.
>
> 5. I can’t find the definition of CatalogContext in the current code base
> and Flink, which appears in the TableLineageEntity.
>
> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
> (the “override” is quite confusing). I’m wondering if these are necessary
> for meta services, as they are actually concepts defined in the runtime
> level of Flink Table / SQL.
>
> Best,
> Qingsheng
>
> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi devs,
> >
> > Is there any comment or feedback for this FLIP? Hope to hear from you,
> > thanks
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-314: Support Customized Job
> > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > streaming and batch jobs create lineage dependency between source and
> > sink,
> > > users can manage their data and jobs according to this lineage
> > information.
> > > For example, when there is a delay in Flink ETL or data, users can
> easily
> > > trace the problematic jobs and affected data. On the other hand, when
> > users
> > > need to correct data or debug, they can perform operations based on
> > lineage
> > > too.
> > >
> > > In FLIP-314 we want to introduce lineage related interfaces for Flink
> and
> > > users can create customized job status listeners. When job status
> > changes,
> > > users can get job status and information to add, update or delete
> > lineage.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > >
> > > Best,
> > > Shammon FY
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi Qingsheng,

Thanks for your valuable feedback.

> 1. Is there any specific use case to expose the batch / streaming info to
listeners or meta services?

I agree with you that Flink is evolving towards batch-streaming
unification, but the lifecycle of them is different. If a job processes a
bound dataset, it will end after completing the data processing, otherwise,
it will run for a long time. In our scenario, we will regularly schedule
some Flink jobs to process bound dataset and update some job information to
the lineage information for the "batch" jobs such as scheduled timestamp,
execution duration when jobs are finished, which is different from
"streaming" jobs. Currently Flink uses  `RuntimeExecutionMode` and
`existsUnboundedSource` in `StreamingGraph` and `StreamingGraphGenerator`
to determine `JobType` and disjoin jobs. We can mark `JobType` as
`PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what do
you think of it?

> 2. it’s better to be more specific here to tell users what information
they could expect to see here, instead of just a “job configuration” as
described in JavaDoc.

Thanks and I have updated the doc in FLIP.

> 3. About the IO executor in JobStatusChangedListenerFactory.Context.

I have updated the docs for io executor  in
`JobStatusChangedListenerFactory.Context`, it is a regular thread pool and
executes submitted tasks in parallel. Users can submit tasks to the
executor which ensures that the submitted task can be executed before the
job exits.

> 4. I don’t quite get the LineageRelationEntity, which is just a list of
LineageEntity.

In the initial idea, the `LineageRelationEntity` is used for `DataStream`
to set additional lineage information besides source. For example, there
are table and column lineages in SQL jobs. When we build a `DataStream` job
with table source and sink, we can add table lineage in the following
method.
```
public class DataStreamSink {
    public DataStreamSink setLineageSources(LineageEntity ... sources);
}
```
But we can not set column lineage for the above sink, and for the sake of
universality, we do not want to add a method similar to `addLineageColumn
(...)` in `DataStreamSink`. So I put this information into
LineageRelationEntity so that SQL and DataStream jobs can be consistent.
But as you mentioned, this approach does indeed lead to ambiguity and
complexity. So my current idea is to add the `setLineageRelation` method in
`DataStreamSink` directly without `LineageRelationEntity`, I have updated
the FLIP and please help to review it again, thanks.

> 5. I can’t find the definition of CatalogContext in the current code base
and Flink, which appears in the TableLineageEntity.

CatalogContext is defined in FLIP-294 and I have updated the FLIP

> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
(the “override” is quite confusing). I’m wondering if these are necessary
for meta services, as they are actually concepts defined in the runtime
level of Flink Table / SQL.

The information in `TableSinkLineageEntity` such as `ModifyType`,
`ChangelogMode` and `override` are mainly used for verification and
display. For example, Flink currently supports `INSERT`/`DELETE` and
`UPDATE`, we only want to report and update lineage for `INSERT` jobs in
our streaming & batch ETL, and display the `override` information on the UI.


Best,
Shammon FY


On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> wrote:

> Hi Shammon,
>
> Thanks for starting this FLIP! Data lineage is a very important topic,
> which has been missing for a long time in Flink. I have some questions
> about the FLIP.
>
> About events and listeners:
>
> 1. I’m not sure if it is necessary to expose JobType to in JobCreatedEvent.
> This is an internal class in flink-runtime, and I think the correct API
> should be RuntimeExecutionMode. Furthermore, I think the boundary of batch
> and streaming becomes much more vague as Flink is evolving towards
> batch-streaming unification, so I’m concerned about exposing JobType as a
> public API. Is there any specific use case to expose the batch / streaming
> info to listeners or meta services?
>
> 2. Currently JobCreatedEvent gives a Configuration, which is quite
> ambiguous. To be honest the configuration is quite a mess in Flink, so
> maybe it’s better to be more specific here to tell users what information
> they could expect to see here, instead of just a “job configuration” as
> described in JavaDoc.
>
> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I think
> more information should be provided here, such as which thread model this
> executor could promise, and whether the user should care about concurrency
> issues. Otherwise I prefer not to give such an utility that no one dares to
> use safely, and leave it to users to choose their implementation.
>
> About lineage:
>
> 4. I don’t quite get the LineageRelationEntity, which is just a list of
> LineageEntity. Could you elaborate more on this class? From my naive
> imagination, the lineage is shaped as a DAG, where vertices are sources and
> sinks (LineageEntity) and edges are connections between them
> (LineageRelation), so it is a bit confusing for a name mixing these two
> concepts.
>
> 5. I can’t find the definition of CatalogContext in the current code base
> and Flink, which appears in the TableLineageEntity.
>
> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
> (the “override” is quite confusing). I’m wondering if these are necessary
> for meta services, as they are actually concepts defined in the runtime
> level of Flink Table / SQL.
>
> Best,
> Qingsheng
>
> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi devs,
> >
> > Is there any comment or feedback for this FLIP? Hope to hear from you,
> > thanks
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-314: Support Customized Job
> > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > > streaming and batch jobs create lineage dependency between source and
> > sink,
> > > users can manage their data and jobs according to this lineage
> > information.
> > > For example, when there is a delay in Flink ETL or data, users can
> easily
> > > trace the problematic jobs and affected data. On the other hand, when
> > users
> > > need to correct data or debug, they can perform operations based on
> > lineage
> > > too.
> > >
> > > In FLIP-314 we want to introduce lineage related interfaces for Flink
> and
> > > users can create customized job status listeners. When job status
> > changes,
> > > users can get job status and information to add, update or delete
> > lineage.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > >
> > > Best,
> > > Shammon FY
> > >
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Shammon. Thanks for bring this FLIP. 
I share all the concerns with Qingsheng.
Beside that, I have few question:

1: What kinds of JobStatus will the `JobExecutionStatusEven` including? From the jave doc "/** Job status changed event for runtime. */"  for the `JobExecutionStatusEvent`, I believe it won't contain all 
JobStatus defined in org.apache.flink.api.common.JobStatus since it also define some JobStatus that not for runtime such as INITIALIZING/CREATED, etc. I think we do need have a clear defination for it.
 
2: I'm really confused about the `config()` included in `LineageEntity`, where is it from and what is it for ?

3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` is needed or not, since `TableSinkLineageEntity` contains `inputChangelogMode`, why `TableSourceLineageEntity` don't contain changelogmode?

4: About `Job lineage for DataStream job`, why DataStreamSink need to set setLineageRelationEntity, shouldn't it be collection by Flink framework automatically?

Btw, since there're a lot interfaces proposed, I think it'll be better to give an example about how to implement a listener in this FLIP to make us know better about the interfaces.


Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Qingsheng Ren" <re...@apache.org>
收件人: "dev" <de...@flink.apache.org>
抄送: "Shammon FY" <zj...@gmail.com>
发送时间: 星期二, 2023年 6 月 20日 下午 6:19:10
主题: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Hi Shammon,

Thanks for starting this FLIP! Data lineage is a very important topic,
which has been missing for a long time in Flink. I have some questions
about the FLIP.

About events and listeners:

1. I’m not sure if it is necessary to expose JobType to in JobCreatedEvent.
This is an internal class in flink-runtime, and I think the correct API
should be RuntimeExecutionMode. Furthermore, I think the boundary of batch
and streaming becomes much more vague as Flink is evolving towards
batch-streaming unification, so I’m concerned about exposing JobType as a
public API. Is there any specific use case to expose the batch / streaming
info to listeners or meta services?

2. Currently JobCreatedEvent gives a Configuration, which is quite
ambiguous. To be honest the configuration is quite a mess in Flink, so
maybe it’s better to be more specific here to tell users what information
they could expect to see here, instead of just a “job configuration” as
described in JavaDoc.

3. JobStatusChangedListenerFactory.Context provides an IO executor. I think
more information should be provided here, such as which thread model this
executor could promise, and whether the user should care about concurrency
issues. Otherwise I prefer not to give such an utility that no one dares to
use safely, and leave it to users to choose their implementation.

About lineage:

4. I don’t quite get the LineageRelationEntity, which is just a list of
LineageEntity. Could you elaborate more on this class? From my naive
imagination, the lineage is shaped as a DAG, where vertices are sources and
sinks (LineageEntity) and edges are connections between them
(LineageRelation), so it is a bit confusing for a name mixing these two
concepts.

5. I can’t find the definition of CatalogContext in the current code base
and Flink, which appears in the TableLineageEntity.

6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
(the “override” is quite confusing). I’m wondering if these are necessary
for meta services, as they are actually concepts defined in the runtime
level of Flink Table / SQL.

Best,
Qingsheng

On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:

> Hi devs,
>
> Is there any comment or feedback for this FLIP? Hope to hear from you,
> thanks
>
> Best,
> Shammon FY
>
>
> On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi devs,
> >
> > I would like to start a discussion on FLIP-314: Support Customized Job
> > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > streaming and batch jobs create lineage dependency between source and
> sink,
> > users can manage their data and jobs according to this lineage
> information.
> > For example, when there is a delay in Flink ETL or data, users can easily
> > trace the problematic jobs and affected data. On the other hand, when
> users
> > need to correct data or debug, they can perform operations based on
> lineage
> > too.
> >
> > In FLIP-314 we want to introduce lineage related interfaces for Flink and
> > users can create customized job status listeners. When job status
> changes,
> > users can get job status and information to add, update or delete
> lineage.
> >
> > Looking forward to your feedback, thanks.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> >
> > Best,
> > Shammon FY
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Qingsheng Ren <re...@apache.org>.
Hi Shammon,

Thanks for starting this FLIP! Data lineage is a very important topic,
which has been missing for a long time in Flink. I have some questions
about the FLIP.

About events and listeners:

1. I’m not sure if it is necessary to expose JobType to in JobCreatedEvent.
This is an internal class in flink-runtime, and I think the correct API
should be RuntimeExecutionMode. Furthermore, I think the boundary of batch
and streaming becomes much more vague as Flink is evolving towards
batch-streaming unification, so I’m concerned about exposing JobType as a
public API. Is there any specific use case to expose the batch / streaming
info to listeners or meta services?

2. Currently JobCreatedEvent gives a Configuration, which is quite
ambiguous. To be honest the configuration is quite a mess in Flink, so
maybe it’s better to be more specific here to tell users what information
they could expect to see here, instead of just a “job configuration” as
described in JavaDoc.

3. JobStatusChangedListenerFactory.Context provides an IO executor. I think
more information should be provided here, such as which thread model this
executor could promise, and whether the user should care about concurrency
issues. Otherwise I prefer not to give such an utility that no one dares to
use safely, and leave it to users to choose their implementation.

About lineage:

4. I don’t quite get the LineageRelationEntity, which is just a list of
LineageEntity. Could you elaborate more on this class? From my naive
imagination, the lineage is shaped as a DAG, where vertices are sources and
sinks (LineageEntity) and edges are connections between them
(LineageRelation), so it is a bit confusing for a name mixing these two
concepts.

5. I can’t find the definition of CatalogContext in the current code base
and Flink, which appears in the TableLineageEntity.

6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a boolean
(the “override” is quite confusing). I’m wondering if these are necessary
for meta services, as they are actually concepts defined in the runtime
level of Flink Table / SQL.

Best,
Qingsheng

On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zj...@gmail.com> wrote:

> Hi devs,
>
> Is there any comment or feedback for this FLIP? Hope to hear from you,
> thanks
>
> Best,
> Shammon FY
>
>
> On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi devs,
> >
> > I would like to start a discussion on FLIP-314: Support Customized Job
> > Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> > streaming and batch jobs create lineage dependency between source and
> sink,
> > users can manage their data and jobs according to this lineage
> information.
> > For example, when there is a delay in Flink ETL or data, users can easily
> > trace the problematic jobs and affected data. On the other hand, when
> users
> > need to correct data or debug, they can perform operations based on
> lineage
> > too.
> >
> > In FLIP-314 we want to introduce lineage related interfaces for Flink and
> > users can create customized job status listeners. When job status
> changes,
> > users can get job status and information to add, update or delete
> lineage.
> >
> > Looking forward to your feedback, thanks.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> >
> > Best,
> > Shammon FY
> >
>

Re: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

Posted by Shammon FY <zj...@gmail.com>.
Hi devs,

Is there any comment or feedback for this FLIP? Hope to hear from you,
thanks

Best,
Shammon FY


On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zj...@gmail.com> wrote:

> Hi devs,
>
> I would like to start a discussion on FLIP-314: Support Customized Job
> Lineage Listener[1] which is the next stage of FLIP-294 [2]. Flink
> streaming and batch jobs create lineage dependency between source and sink,
> users can manage their data and jobs according to this lineage information.
> For example, when there is a delay in Flink ETL or data, users can easily
> trace the problematic jobs and affected data. On the other hand, when users
> need to correct data or debug, they can perform operations based on lineage
> too.
>
> In FLIP-314 we want to introduce lineage related interfaces for Flink and
> users can create customized job status listeners. When job status changes,
> users can get job status and information to add, update or delete lineage.
>
> Looking forward to your feedback, thanks.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>
> Best,
> Shammon FY
>