You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2019/08/15 15:42:05 UTC

Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

Sorry for the late response. So many FLIPs these days.

I am a bit unsure about the motivation here, and that this need to be a
part of Flink. It sounds like this can be perfectly built around Flink as a
minimal library on top of it, without any change in the core APIs or
runtime.

The proposal to handle "caching intermediate results" (to make them
reusable across jobs in a session), and "writing them in different formats
/ indexing them" doesn't sound like it should be the same mechanism.

  - The caching part is a transparent low-level primitive. It avoid
re-executing a part of the job graph, but otherwise is completely
transparent to the consumer job.

  - Writing data out in a sink, compressing/indexing it and then reading it
in another job is also a way of reusing a previous result, but on a
completely different abstraction level. It is not the same intermediate
result any more. When the consumer reads from it and applies predicate
pushdown, etc. then the consumer job looks completely different from a job
that consumed the original result. It hence needs to be solved on the API
level via a sink and a source.

I would suggest to keep these concepts separate: Caching (possibly
automatically) for jobs in a session, and long term writing/sharing of data
sets.

Solving the "long term writing/sharing" in a library rather than in the
runtime also has the advantage of not pushing yet more stuff into Flink's
core, which I believe is also an important criterion.

Best,
Stephan


On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <su...@gmail.com> wrote:

> Hi folks,
>
> I would like to start the FLIP discussion thread about the pluggable
> intermediate result storage.
>
> This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip
> to end of metadata. While the FLIP-36 provides a default implementation of
> the intermediate result storage using the shuffle service, we would like to
> make the intermediate result storage pluggable so that the user can easily
> swap the storage.
>
> We are looking forward to your thought!
>
> The FLIP link is the following:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> >
>
> Best,
> Xuannan
>

Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

Posted by Stephan Ewen <se...@apache.org>.
I understand your argument about performance, though I am unsure it will be
that noticeable and warrants the added complexity,

Besides performance, I believe the other two arguments in the previous
mails are quite important to consider:

  - Keeping complexity out of the core is a good goal. It is easy to start
with a library and make a deeper integration later when we see the need and
that there is really something missing in the user experience.

  - The mechanism you describe is closer to "view matching" and only
applicable in the Table API with an optimizer that understands the
semantics, whereas result caching is more low-level and works in the
DataSet / DataStream API as well. That distinction suggests it is not
exactly the same mechanism anyways (through view matching can be partiall
built on top of intermediate results).

Best,
Stephan


On Tue, Sep 24, 2019 at 2:01 AM Becket Qin <be...@gmail.com> wrote:

> Hi Stephan,
>
> In terms of the performance concern, please see my understanding below.
>
> ## Breaking the pipeline v.s. adding a sink.
> If two operators are initially chained, they will belong to the same stage
> in the DAG and the same task, therefore the main processing path will just
> have one task without serde in the middle. I was trying to see the overhead
> of adding a new sink or breaking the pipeline.
>
>   - Adding a new sink introduces serialization cost, and potentially
> network IO if the sink writes to a remote storage instead of local file
> system.
>   - Breaking the pipeline introduces a new stage, a new task, additional
> serialization / deserialization cost and potential network IO.
>
> Therefore I thought that adding a new sink will have better performance
> than breaking the pipeline because it has lower cost in general.
> Please let me know if I missed something.
>
> The above scenarios assume that users want to cache the result in the
> middle of an operator chain, but not at the shuffle boundary. If the cache
> is at the shuffle boundary, it would duplicate the records unless the
> pluggable shuffle service is also the pluggable intermediate result storage
> at the same time. In that case, there will be just one copy of the records,
> but could be read by either the pluggable shuffle service or the pluggable
> intermediate result storage.
>
> ## Reading a subset of record
> You are right. Any additional indexing / compression / columnizing of the
> raw intermediate result introduces overhead. So it only makes sense if the
> saving is greater than the overhead. One such example is iteration. In that
> case, the cached intermediate results may be read for some undefined times
> and the initial overhead of columnizing would be worthwhile.
>
>
> In general, I am with you that this could be put in an external library. It
> is achievable if we only address the cross-session intermediate result
> sharing. However, an external library is not sufficient to provide
> optimized in-session intermediate result sharing. This is mainly because
> when the job exits, RM needs to clean up the intermediate results. So
> basically we are choosing between the following two options.
>
> Option 1: in-session sharing is only served by shuffle service, no special
> performance optimization.
> Option 2: In-session sharing is served by shuffle service by default,
> performance optimization can be provided by pluggable intermediate result
> storage.
>
> It would be helpful for us to first agree on whether we want to have
> performance optimization for in-session intermediate result sharing? If
> not, option 1 is good enough. Otherwise, we would need something pluggable
> for the in-session intermediate result.
>
> Thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Sun, Sep 22, 2019 at 8:44 PM Stephan Ewen <se...@apache.org> wrote:
>
> > ## About the improvements you mentioned in (1)
> >
> >   - I am not sure that this helps to improve performance by avoiding to
> > break the pipeline.
> >     Attaching an additional sink, would in virtually any case add even
> more
> > overhead than the pipeline breaking.
> >     What is your reasoning why it would be faster, all in all?
> >
> >   - About reading only a subset of the records:
> >      - If this is about reading the data once or twice, then
> > columnarizing/indexing/compressing the data is more expensive than just
> > reading it twice more.
> >      - This means turning the mechanism into something like materialized
> > view matching, rather than result caching. That should happen in
> different
> > parts of the stack (view matching needs schema, semantics, etc.). I am
> not
> > sure mixing both is even a good idea.
> >
> >
> > ## The way I see the trade-offs are:
> >
> > Pro in core Flink:
> >   - Small improvement to API experience, compared to a library
> >
> > Contra in core Flink:
> >   - added API complexity, maintenance and evolution overhead
> >   - not clear what impacts mixing materialized view matching and result
> > caching has on the system architecture
> >   - Not yet a frequent use case, possibly a frequent use case in the
> > future.
> >   - Starting as a library allows for merging into the core later when
> this
> > use case becomes major and experience improvement proves big.
> >
> > Unclear
> >   - is breaking the pipeline by introducing a blocking intermediate
> result
> > really worse than duplicating the data into an additional sink?
> >
> >
> > ==> Especially because so we can still make it part of Flink later once
> the
> > use case and approach are a bit more fleshed out, this looks like a
> strong
> > case for starting with a library approach here.
> >
> > Best,
> > Stephan
> >
> >
> >
> > On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <be...@gmail.com> wrote:
> >
> > > Hi Stephan,
> > >
> > > Sorry for the belated reply. You are right that the functionality
> > proposed
> > > in this FLIP can be implemented out of the Flink core as an ecosystem
> > > project.
> > >
> > > The main motivation of this FLIP is two folds:
> > >
> > > 1. Improve the performance of intermediate result sharing in the same
> > > session.
> > > Using the internal shuffle service to store cached result has two
> > potential
> > > performance problems.
> > >   a) the cached intermediate results may break the operator chaining
> due
> > to
> > > the addition of BLOCKING_PERSISTENT edge.
> > >   b) the downstream processor must read all the records in intermediate
> > > results to process.
> > >
> > > A pluggable intermediate result storage will help address both of the
> > > problem. Adding a sink will not break chaining, but just ensure cached
> > > logical node will not be optimized away. The pluggable storage can help
> > > improve the performance by making the intermediate results filterable /
> > > projectable, etc. Alternatively we can make the shuffle service more
> > > sophisticated, but it may complicate things and is not necessary for
> the
> > > normal shuffles.
> > >
> > > This motivation seems difficult to be addressed as an external library
> on
> > > top of Flink core, mainly because the in-session intermediate result
> > > cleanup may need participation of RM to achieve fault tolerance. Also,
> > > having an external library essentially introduces another way to cache
> > the
> > > in-session intermediate results.
> > >
> > > 2. Cross session intermediate result sharing.
> > > As you said, this can be implemented as an external library. The only
> > > difference is that users may need to deal with another set of API, but
> > that
> > > seems OK.
> > >
> > >
> > > So for this FLIP, it would be good to see whether we think motivation 1
> > is
> > > worth addressing or not.
> > >
> > > What do you think?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Sorry for the late response. So many FLIPs these days.
> > > >
> > > > I am a bit unsure about the motivation here, and that this need to
> be a
> > > > part of Flink. It sounds like this can be perfectly built around
> Flink
> > > as a
> > > > minimal library on top of it, without any change in the core APIs or
> > > > runtime.
> > > >
> > > > The proposal to handle "caching intermediate results" (to make them
> > > > reusable across jobs in a session), and "writing them in different
> > > formats
> > > > / indexing them" doesn't sound like it should be the same mechanism.
> > > >
> > > >   - The caching part is a transparent low-level primitive. It avoid
> > > > re-executing a part of the job graph, but otherwise is completely
> > > > transparent to the consumer job.
> > > >
> > > >   - Writing data out in a sink, compressing/indexing it and then
> > reading
> > > it
> > > > in another job is also a way of reusing a previous result, but on a
> > > > completely different abstraction level. It is not the same
> intermediate
> > > > result any more. When the consumer reads from it and applies
> predicate
> > > > pushdown, etc. then the consumer job looks completely different from
> a
> > > job
> > > > that consumed the original result. It hence needs to be solved on the
> > API
> > > > level via a sink and a source.
> > > >
> > > > I would suggest to keep these concepts separate: Caching (possibly
> > > > automatically) for jobs in a session, and long term writing/sharing
> of
> > > data
> > > > sets.
> > > >
> > > > Solving the "long term writing/sharing" in a library rather than in
> the
> > > > runtime also has the advantage of not pushing yet more stuff into
> > Flink's
> > > > core, which I believe is also an important criterion.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I would like to start the FLIP discussion thread about the
> pluggable
> > > > > intermediate result storage.
> > > > >
> > > > > This is phase 2 of FLIP-36: Support Interactive Programming in
> Flink
> > > Skip
> > > > > to end of metadata. While the FLIP-36 provides a default
> > implementation
> > > > of
> > > > > the intermediate result storage using the shuffle service, we would
> > > like
> > > > to
> > > > > make the intermediate result storage pluggable so that the user can
> > > > easily
> > > > > swap the storage.
> > > > >
> > > > > We are looking forward to your thought!
> > > > >
> > > > > The FLIP link is the following:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> > > > > >
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

Posted by Becket Qin <be...@gmail.com>.
Hi Stephan,

In terms of the performance concern, please see my understanding below.

## Breaking the pipeline v.s. adding a sink.
If two operators are initially chained, they will belong to the same stage
in the DAG and the same task, therefore the main processing path will just
have one task without serde in the middle. I was trying to see the overhead
of adding a new sink or breaking the pipeline.

  - Adding a new sink introduces serialization cost, and potentially
network IO if the sink writes to a remote storage instead of local file
system.
  - Breaking the pipeline introduces a new stage, a new task, additional
serialization / deserialization cost and potential network IO.

Therefore I thought that adding a new sink will have better performance
than breaking the pipeline because it has lower cost in general.
Please let me know if I missed something.

The above scenarios assume that users want to cache the result in the
middle of an operator chain, but not at the shuffle boundary. If the cache
is at the shuffle boundary, it would duplicate the records unless the
pluggable shuffle service is also the pluggable intermediate result storage
at the same time. In that case, there will be just one copy of the records,
but could be read by either the pluggable shuffle service or the pluggable
intermediate result storage.

## Reading a subset of record
You are right. Any additional indexing / compression / columnizing of the
raw intermediate result introduces overhead. So it only makes sense if the
saving is greater than the overhead. One such example is iteration. In that
case, the cached intermediate results may be read for some undefined times
and the initial overhead of columnizing would be worthwhile.


In general, I am with you that this could be put in an external library. It
is achievable if we only address the cross-session intermediate result
sharing. However, an external library is not sufficient to provide
optimized in-session intermediate result sharing. This is mainly because
when the job exits, RM needs to clean up the intermediate results. So
basically we are choosing between the following two options.

Option 1: in-session sharing is only served by shuffle service, no special
performance optimization.
Option 2: In-session sharing is served by shuffle service by default,
performance optimization can be provided by pluggable intermediate result
storage.

It would be helpful for us to first agree on whether we want to have
performance optimization for in-session intermediate result sharing? If
not, option 1 is good enough. Otherwise, we would need something pluggable
for the in-session intermediate result.

Thoughts?

Thanks,

Jiangjie (Becket) Qin




On Sun, Sep 22, 2019 at 8:44 PM Stephan Ewen <se...@apache.org> wrote:

> ## About the improvements you mentioned in (1)
>
>   - I am not sure that this helps to improve performance by avoiding to
> break the pipeline.
>     Attaching an additional sink, would in virtually any case add even more
> overhead than the pipeline breaking.
>     What is your reasoning why it would be faster, all in all?
>
>   - About reading only a subset of the records:
>      - If this is about reading the data once or twice, then
> columnarizing/indexing/compressing the data is more expensive than just
> reading it twice more.
>      - This means turning the mechanism into something like materialized
> view matching, rather than result caching. That should happen in different
> parts of the stack (view matching needs schema, semantics, etc.). I am not
> sure mixing both is even a good idea.
>
>
> ## The way I see the trade-offs are:
>
> Pro in core Flink:
>   - Small improvement to API experience, compared to a library
>
> Contra in core Flink:
>   - added API complexity, maintenance and evolution overhead
>   - not clear what impacts mixing materialized view matching and result
> caching has on the system architecture
>   - Not yet a frequent use case, possibly a frequent use case in the
> future.
>   - Starting as a library allows for merging into the core later when this
> use case becomes major and experience improvement proves big.
>
> Unclear
>   - is breaking the pipeline by introducing a blocking intermediate result
> really worse than duplicating the data into an additional sink?
>
>
> ==> Especially because so we can still make it part of Flink later once the
> use case and approach are a bit more fleshed out, this looks like a strong
> case for starting with a library approach here.
>
> Best,
> Stephan
>
>
>
> On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <be...@gmail.com> wrote:
>
> > Hi Stephan,
> >
> > Sorry for the belated reply. You are right that the functionality
> proposed
> > in this FLIP can be implemented out of the Flink core as an ecosystem
> > project.
> >
> > The main motivation of this FLIP is two folds:
> >
> > 1. Improve the performance of intermediate result sharing in the same
> > session.
> > Using the internal shuffle service to store cached result has two
> potential
> > performance problems.
> >   a) the cached intermediate results may break the operator chaining due
> to
> > the addition of BLOCKING_PERSISTENT edge.
> >   b) the downstream processor must read all the records in intermediate
> > results to process.
> >
> > A pluggable intermediate result storage will help address both of the
> > problem. Adding a sink will not break chaining, but just ensure cached
> > logical node will not be optimized away. The pluggable storage can help
> > improve the performance by making the intermediate results filterable /
> > projectable, etc. Alternatively we can make the shuffle service more
> > sophisticated, but it may complicate things and is not necessary for the
> > normal shuffles.
> >
> > This motivation seems difficult to be addressed as an external library on
> > top of Flink core, mainly because the in-session intermediate result
> > cleanup may need participation of RM to achieve fault tolerance. Also,
> > having an external library essentially introduces another way to cache
> the
> > in-session intermediate results.
> >
> > 2. Cross session intermediate result sharing.
> > As you said, this can be implemented as an external library. The only
> > difference is that users may need to deal with another set of API, but
> that
> > seems OK.
> >
> >
> > So for this FLIP, it would be good to see whether we think motivation 1
> is
> > worth addressing or not.
> >
> > What do you think?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > > Sorry for the late response. So many FLIPs these days.
> > >
> > > I am a bit unsure about the motivation here, and that this need to be a
> > > part of Flink. It sounds like this can be perfectly built around Flink
> > as a
> > > minimal library on top of it, without any change in the core APIs or
> > > runtime.
> > >
> > > The proposal to handle "caching intermediate results" (to make them
> > > reusable across jobs in a session), and "writing them in different
> > formats
> > > / indexing them" doesn't sound like it should be the same mechanism.
> > >
> > >   - The caching part is a transparent low-level primitive. It avoid
> > > re-executing a part of the job graph, but otherwise is completely
> > > transparent to the consumer job.
> > >
> > >   - Writing data out in a sink, compressing/indexing it and then
> reading
> > it
> > > in another job is also a way of reusing a previous result, but on a
> > > completely different abstraction level. It is not the same intermediate
> > > result any more. When the consumer reads from it and applies predicate
> > > pushdown, etc. then the consumer job looks completely different from a
> > job
> > > that consumed the original result. It hence needs to be solved on the
> API
> > > level via a sink and a source.
> > >
> > > I would suggest to keep these concepts separate: Caching (possibly
> > > automatically) for jobs in a session, and long term writing/sharing of
> > data
> > > sets.
> > >
> > > Solving the "long term writing/sharing" in a library rather than in the
> > > runtime also has the advantage of not pushing yet more stuff into
> Flink's
> > > core, which I believe is also an important criterion.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <su...@gmail.com>
> > wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I would like to start the FLIP discussion thread about the pluggable
> > > > intermediate result storage.
> > > >
> > > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink
> > Skip
> > > > to end of metadata. While the FLIP-36 provides a default
> implementation
> > > of
> > > > the intermediate result storage using the shuffle service, we would
> > like
> > > to
> > > > make the intermediate result storage pluggable so that the user can
> > > easily
> > > > swap the storage.
> > > >
> > > > We are looking forward to your thought!
> > > >
> > > > The FLIP link is the following:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> > > > >
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

Posted by Stephan Ewen <se...@apache.org>.
## About the improvements you mentioned in (1)

  - I am not sure that this helps to improve performance by avoiding to
break the pipeline.
    Attaching an additional sink, would in virtually any case add even more
overhead than the pipeline breaking.
    What is your reasoning why it would be faster, all in all?

  - About reading only a subset of the records:
     - If this is about reading the data once or twice, then
columnarizing/indexing/compressing the data is more expensive than just
reading it twice more.
     - This means turning the mechanism into something like materialized
view matching, rather than result caching. That should happen in different
parts of the stack (view matching needs schema, semantics, etc.). I am not
sure mixing both is even a good idea.


## The way I see the trade-offs are:

Pro in core Flink:
  - Small improvement to API experience, compared to a library

Contra in core Flink:
  - added API complexity, maintenance and evolution overhead
  - not clear what impacts mixing materialized view matching and result
caching has on the system architecture
  - Not yet a frequent use case, possibly a frequent use case in the future.
  - Starting as a library allows for merging into the core later when this
use case becomes major and experience improvement proves big.

Unclear
  - is breaking the pipeline by introducing a blocking intermediate result
really worse than duplicating the data into an additional sink?


==> Especially because so we can still make it part of Flink later once the
use case and approach are a bit more fleshed out, this looks like a strong
case for starting with a library approach here.

Best,
Stephan



On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <be...@gmail.com> wrote:

> Hi Stephan,
>
> Sorry for the belated reply. You are right that the functionality proposed
> in this FLIP can be implemented out of the Flink core as an ecosystem
> project.
>
> The main motivation of this FLIP is two folds:
>
> 1. Improve the performance of intermediate result sharing in the same
> session.
> Using the internal shuffle service to store cached result has two potential
> performance problems.
>   a) the cached intermediate results may break the operator chaining due to
> the addition of BLOCKING_PERSISTENT edge.
>   b) the downstream processor must read all the records in intermediate
> results to process.
>
> A pluggable intermediate result storage will help address both of the
> problem. Adding a sink will not break chaining, but just ensure cached
> logical node will not be optimized away. The pluggable storage can help
> improve the performance by making the intermediate results filterable /
> projectable, etc. Alternatively we can make the shuffle service more
> sophisticated, but it may complicate things and is not necessary for the
> normal shuffles.
>
> This motivation seems difficult to be addressed as an external library on
> top of Flink core, mainly because the in-session intermediate result
> cleanup may need participation of RM to achieve fault tolerance. Also,
> having an external library essentially introduces another way to cache the
> in-session intermediate results.
>
> 2. Cross session intermediate result sharing.
> As you said, this can be implemented as an external library. The only
> difference is that users may need to deal with another set of API, but that
> seems OK.
>
>
> So for this FLIP, it would be good to see whether we think motivation 1 is
> worth addressing or not.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <se...@apache.org> wrote:
>
> > Sorry for the late response. So many FLIPs these days.
> >
> > I am a bit unsure about the motivation here, and that this need to be a
> > part of Flink. It sounds like this can be perfectly built around Flink
> as a
> > minimal library on top of it, without any change in the core APIs or
> > runtime.
> >
> > The proposal to handle "caching intermediate results" (to make them
> > reusable across jobs in a session), and "writing them in different
> formats
> > / indexing them" doesn't sound like it should be the same mechanism.
> >
> >   - The caching part is a transparent low-level primitive. It avoid
> > re-executing a part of the job graph, but otherwise is completely
> > transparent to the consumer job.
> >
> >   - Writing data out in a sink, compressing/indexing it and then reading
> it
> > in another job is also a way of reusing a previous result, but on a
> > completely different abstraction level. It is not the same intermediate
> > result any more. When the consumer reads from it and applies predicate
> > pushdown, etc. then the consumer job looks completely different from a
> job
> > that consumed the original result. It hence needs to be solved on the API
> > level via a sink and a source.
> >
> > I would suggest to keep these concepts separate: Caching (possibly
> > automatically) for jobs in a session, and long term writing/sharing of
> data
> > sets.
> >
> > Solving the "long term writing/sharing" in a library rather than in the
> > runtime also has the advantage of not pushing yet more stuff into Flink's
> > core, which I believe is also an important criterion.
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <su...@gmail.com>
> wrote:
> >
> > > Hi folks,
> > >
> > > I would like to start the FLIP discussion thread about the pluggable
> > > intermediate result storage.
> > >
> > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink
> Skip
> > > to end of metadata. While the FLIP-36 provides a default implementation
> > of
> > > the intermediate result storage using the shuffle service, we would
> like
> > to
> > > make the intermediate result storage pluggable so that the user can
> > easily
> > > swap the storage.
> > >
> > > We are looking forward to your thought!
> > >
> > > The FLIP link is the following:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> > > >
> > >
> > > Best,
> > > Xuannan
> > >
> >
>

Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

Posted by Becket Qin <be...@gmail.com>.
Hi Stephan,

Sorry for the belated reply. You are right that the functionality proposed
in this FLIP can be implemented out of the Flink core as an ecosystem
project.

The main motivation of this FLIP is two folds:

1. Improve the performance of intermediate result sharing in the same
session.
Using the internal shuffle service to store cached result has two potential
performance problems.
  a) the cached intermediate results may break the operator chaining due to
the addition of BLOCKING_PERSISTENT edge.
  b) the downstream processor must read all the records in intermediate
results to process.

A pluggable intermediate result storage will help address both of the
problem. Adding a sink will not break chaining, but just ensure cached
logical node will not be optimized away. The pluggable storage can help
improve the performance by making the intermediate results filterable /
projectable, etc. Alternatively we can make the shuffle service more
sophisticated, but it may complicate things and is not necessary for the
normal shuffles.

This motivation seems difficult to be addressed as an external library on
top of Flink core, mainly because the in-session intermediate result
cleanup may need participation of RM to achieve fault tolerance. Also,
having an external library essentially introduces another way to cache the
in-session intermediate results.

2. Cross session intermediate result sharing.
As you said, this can be implemented as an external library. The only
difference is that users may need to deal with another set of API, but that
seems OK.


So for this FLIP, it would be good to see whether we think motivation 1 is
worth addressing or not.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <se...@apache.org> wrote:

> Sorry for the late response. So many FLIPs these days.
>
> I am a bit unsure about the motivation here, and that this need to be a
> part of Flink. It sounds like this can be perfectly built around Flink as a
> minimal library on top of it, without any change in the core APIs or
> runtime.
>
> The proposal to handle "caching intermediate results" (to make them
> reusable across jobs in a session), and "writing them in different formats
> / indexing them" doesn't sound like it should be the same mechanism.
>
>   - The caching part is a transparent low-level primitive. It avoid
> re-executing a part of the job graph, but otherwise is completely
> transparent to the consumer job.
>
>   - Writing data out in a sink, compressing/indexing it and then reading it
> in another job is also a way of reusing a previous result, but on a
> completely different abstraction level. It is not the same intermediate
> result any more. When the consumer reads from it and applies predicate
> pushdown, etc. then the consumer job looks completely different from a job
> that consumed the original result. It hence needs to be solved on the API
> level via a sink and a source.
>
> I would suggest to keep these concepts separate: Caching (possibly
> automatically) for jobs in a session, and long term writing/sharing of data
> sets.
>
> Solving the "long term writing/sharing" in a library rather than in the
> runtime also has the advantage of not pushing yet more stuff into Flink's
> core, which I believe is also an important criterion.
>
> Best,
> Stephan
>
>
> On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi folks,
> >
> > I would like to start the FLIP discussion thread about the pluggable
> > intermediate result storage.
> >
> > This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip
> > to end of metadata. While the FLIP-36 provides a default implementation
> of
> > the intermediate result storage using the shuffle service, we would like
> to
> > make the intermediate result storage pluggable so that the user can
> easily
> > swap the storage.
> >
> > We are looking forward to your thought!
> >
> > The FLIP link is the following:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> > >
> >
> > Best,
> > Xuannan
> >
>