You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xuannan Su <su...@gmail.com> on 2021/12/29 09:36:40 UTC

[DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Hi devs,

I’d like to start a discussion about adding support to cache the
intermediate result at DataStream API for batch processing.

As the DataStream API now supports batch execution mode, we see users
using the DataStream API to run batch jobs. Interactive programming is
an important use case of Flink batch processing. And the ability to
cache intermediate results of a DataStream is crucial to the
interactive programming experience.

Therefore, we propose to support caching a DataStream in Batch
execution. We believe that users can benefit a lot from the change and
encourage them to use DataStream API for their interactive batch
processing work.

Please check out the FLIP-205 [1] and feel free to reply to this email
thread. Looking forward to your feedback!

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing

Best,
Xuannan

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by David Morávek <dm...@apache.org>.
+1 Thanks Xuannan, no more objections from my side

On Mon, Jan 17, 2022 at 7:14 AM Yun Gao <yu...@aliyun.com.invalid>
wrote:

> Hi Xuannan,
>
> Very thanks for the detailed explanation and sorry for the very
> late response.
>
> For cached result partition v.s. managed table, I also agree with
> the current conclusion that they could be differentiated at the moment:
> cached result partition could be viewed as an internal, lightweight data
> cache whose lifecycle is bound to the current application, and managed
> table could be viewed as an external service whose lifecycle could be
> across multiple applications.
>
> For the other issues, after more thoughts I currently have two remaining
> issues:
>
> 1. Regarding the api, I think it should work if we could execute multiple
> caches as a whole, but from the FLIP, currently in the example it seems
> we are calling execute_and_collect() on top of a single CachedDataStream?
> Also in the give API CachedDataStream does not seem to have a method
> execute_and_collect() ?
> 2. For re-submitting the job when the cached result partition is missing,
> would
> this happen in the client side or in the scheduler? If this happens in the
> client
> side, we need to bypass the give failover strategy (like attempting for N
> times)
> when we found the cache result partition is missed?
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Xuannan Su <su...@gmail.com>
> Send Time:2022 Jan. 17 (Mon.) 13:00
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch
> Processing
>
> Hi David,
>
> Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
> can solve the problem of choosing the proper parallelism, and thus it
> should be fine to not provide the method to set the parallelism of the
> cache.
>
> And you understand of the outcome of this FLIP is correct.
>
> If there are no more feedback and objections, I would like to start a vote
> thread tomorrow.
>
> Best,
> Xuannan
>
> On Fri, Jan 14, 2022 at 5:34 PM David Morávek <dm...@apache.org> wrote:
>
> > Hi Xuannan,
> >
> > I think this already looks really good. The whole discussions is pretty
> > long, so I'll just to summarize my current understanding of the outcome:
> >
> > - This only aims on the DataStream API for now, but can be used as a
> > building block for the higher level abstractions (Table API).
> > - We're pushing caching down to the shuffle service (works with all
> > implementations), storing the intermediate results. This should also
> > naturally work with current fail-over mechanisms for batch (backtrack +
> > recompute missing intermediate results [1]).
> >
> >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> >
> >
> > Is the whole "choosing the right parallelism for caching" problem solved
> by
> > the Adaptive Batch Job Scheduler [2]?
> >
> > [1]
> >
> >
> https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> >
> > Best,
> > D.
> >
> > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com>
> wrote:
> >
> > > Hi Gen,
> > >
> > > Thanks for your feedback.
> > >
> > > I think you are talking about how we are going to store the caching
> > > data. The first option is to write the data with a sink to an external
> > > file system, much like the file store of the Dynamic Table. If I
> > > understand correctly, it requires a distributed file system, e.g HDSF,
> > > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > > file system for caching.
> > >
> > > As you said, using the shuffle service for caching is quite natural as
> > > we need to produce the intermediate result anyway. For Table/SQL API,
> > > the table operations are translated to transformations, where we can
> > > reuse the CacheTransformation. It should not be unfriendly for
> > > Table/SQL API.
> > >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > >
> > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I do agree that dynamic tables and cached partitions are similar
> > features
> > > > aiming different cases.  In my opinion, the main difference of the
> > > > implementations is to cache only the data or the whole result
> > partition.
> > > >
> > > > To cache only the data, we can translate the CacheTransformation to a
> > > Sink
> > > > node for writing, and Source node for consuming. Most of the things
> are
> > > > just the same as this FLIP, except for the storage, which is an
> > external
> > > > one (or a built-in one if we can use the dynamic table storage),
> > instead
> > > of
> > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> > This
> > > > can make caching independent from a specific shuffle service, and
> make
> > it
> > > > possible to share data between different jobs / Per-Job mode jobs.
> > > >
> > > > Caching the whole partition is natural in DataStream API, since the
> > > > partition is a low-level concept, and data storage is already
> provided
> > by
> > > > the default shuffle service. So if we want to choose a solution only
> to
> > > > support cache in DataStream API, caching the whole partition can be a
> > > good
> > > > choice. But this may be not as friendly to Table/SQL API as to
> > > > DataStream, since users are announcing to cache a logical Table
> (view),
> > > > rather than a physical partition. If we want a unified solution for
> > both
> > > > APIs, this may need to be considered.
> > > >
> > > >
> > > > And here's another suggestion to this FLIP. Maybe we should support
> > > > "setParallelism" in CacheTransformation, for both caching and
> > consuming.
> > > >
> > > > In some cases, the input parallelism of the CacheTransformation is
> > large
> > > > but the result dataset is relatively small. We may need too many
> > > resources
> > > > to consume the result partition if the source parallelism has to be
> the
> > > > same with the producer.
> > > >
> > > > On the other hand, serving a large number of partitions may also have
> > > more
> > > > overhead. Though maybe it's not a big burban, we can try to reduce
> the
> > > > actual cached partition count if necessary, for example by adding a
> > > > pass-through vertex with the specific parallelism between the
> producer
> > > and
> > > > the cache vertices.
> > > >
> > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <
> zhangzhipengw@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Xuannnan,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Regarding whether and how to support cache sideoutput, I agree that
> > the
> > > > > second option might be better if there do exist a use case that
> users
> > > need
> > > > > to cache only some certain side outputs.
> > > > >
> > > > >
> > > > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > > >
> > > > > > Hi Zhipeng and Gen,
> > > > > >
> > > > > > Thanks for joining the discussion.
> > > > > >
> > > > > > For Zhipeng:
> > > > > >
> > > > > > - Can we support side output
> > > > > > Caching the side output is indeed a valid use case. However, with
> > the
> > > > > > current API, it is not straightforward to cache the side output.
> > You
> > > > > > can apply an identity map function to the DataStream returned by
> > the
> > > > > > getSideOutput method and then cache the result of the map
> > > > > > transformation. In my opinion, it is not user-friendly.
> Therefore,
> > we
> > > > > > should think of a way to better support the use case.
> > > > > > As you say, we can introduce a new class
> > > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> > `getSideOutput`
> > > > > > method to return a `CachedDatastream`. With this approach, the
> > cache
> > > > > > method implies that both output and the side output of the
> > > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > > approach is that the user has no control over which side output
> > > should
> > > > > > be cached.
> > > > > > Another option would be to let the `getSideOuput` method return
> the
> > > > > > `SingleOutputStreamOperator`. This way, users can decide which
> side
> > > > > > output to cache. As the `getSideOutput` method returns a
> > > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > > transformation that produce the side output, e.g. parallelism,
> > buffer
> > > > > > timeout, etc. If users try to set different values of the same
> > > > > > property of a transformation, an exception will be thrown. What
> do
> > > you
> > > > > > think?
> > > > > >
> > > > > > - Can we support Stream Mode
> > > > > > Running a job in stream mode doesn't guarantee the job will
> finish,
> > > > > > while in batch mode, it does.  This is the main reason that
> > prevents
> > > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > > unless the job can finish.
> > > > > > If I understand correctly, by "run batch jobs in Stream Mode",
> you
> > > > > > mean that you have a job with all bounded sources, but you want
> the
> > > > > > intermediate data to shuffle in pipelined mode instead of
> blocking
> > > > > > mode. If that is the case, the job can run in batch mode with
> > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> > [1].
> > > > > > And we can support caching in this case.
> > > > > >
> > > > > > - Change parallelism of CachedDataStream
> > > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> > Thus,
> > > > > > it should not be a problem with CachedDataStream.
> > > > > >
> > > > > > For Gen:
> > > > > >
> > > > > > - Relation between FLIP-205 and FLIP-188
> > > > > > Although it feels like dynamic table and caching are similar in
> the
> > > > > > sense that they let user reuse come intermediate result, they
> > target
> > > > > > different use cases. The dynamic table is targeting the use case
> > > where
> > > > > > users want to share a dynamic updating intermediate result across
> > > > > > multiple applications. It is some meaningful data that can be
> > > consumed
> > > > > > by different Flink applications and Flink jobs. While caching is
> > > > > > targeting the use case where users know that all the sources are
> > > > > > bounded and static, and caching is only used to avoid
> re-computing
> > > the
> > > > > > intermediate result. And the cached intermediate result is only
> > > > > > meaningful crossing jobs in the same application.
> > > > > >
> > > > > > Dynamic table and caching can be used together. For example, in a
> > > > > > machine learning scenario, we can have a Stream job that is
> > > generating
> > > > > > some training samples. And we can create a dynamic table for the
> > > > > > training sample. And we run a Flink application every hour to do
> > some
> > > > > > data analysis on the training sample generated in the last hour.
> > The
> > > > > > Flink application consists of multiple batch jobs and the batch
> > jobs
> > > > > > share some intermediate results, so users can use cache to avoid
> > > > > > re-computation. The intermediate result is not meaningful outside
> > of
> > > > > > the application. And the cache will be discarded after the
> > > application
> > > > > > is finished.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> > dynamic
> > > > > table
> > > > > > > storage, which provides a unified changelog & table
> > representation.
> > > > > > Tables
> > > > > > > stored there can be used in further ad-hoc queries. To my
> > > > > understanding,
> > > > > > > it's quite like an implementation of caching in Table API, and
> > the
> > > > > ad-hoc
> > > > > > > queries are somehow like further steps in an interactive
> program.
> > > > > > >
> > > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > > part of
> > > > > > > interactive programming in Table API, which we all agree is the
> > > major
> > > > > > > scenario. What do you think about the relation between it and
> > > FLIP-188?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> > suxuannan95@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
> > > > > > > > Thanks for sharing your thoughts.
> > > > > > > >
> > > > > > > > You are right that most people tend to use high-level API for
> > > > > > > > interactive data exploration. Actually, there is
> > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As
> far
> > > as I
> > > > > > > > know, it has been accepted but hasn’t been implemented. At
> the
> > > time
> > > > > > > > when it is drafted, DataStream did not support Batch mode but
> > > Table
> > > > > > > > API does.
> > > > > > > >
> > > > > > > > Now that the DataStream API does support batch processing, I
> > > think we
> > > > > > > > can focus on supporting cache at DataStream first. It is
> still
> > > > > > > > valuable for DataStream users and most of the work we do in
> > this
> > > FLIP
> > > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > > >
> > > > > > > > After caching is supported at DataStream, we can continue
> from
> > > where
> > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We
> might
> > > have
> > > > > to
> > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <
> dmvk@apache.org
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > thanks for drafting this FLIP.
> > > > > > > > >
> > > > > > > > > One immediate thought, from what I've seen for interactive
> > data
> > > > > > > > exploration
> > > > > > > > > with Spark, most people tend to use the higher level APIs,
> > that
> > > > > > allow for
> > > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > > Table
> > > > > API
> > > > > > also
> > > > > > > > > be covered by this FLIP?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > D.
> > > > > > > > >
> > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > > suxuannan95@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > I’d like to start a discussion about adding support to
> > cache
> > > the
> > > > > > > > > > intermediate result at DataStream API for batch
> processing.
> > > > > > > > > >
> > > > > > > > > > As the DataStream API now supports batch execution mode,
> we
> > > see
> > > > > > users
> > > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > > programming is
> > > > > > > > > > an important use case of Flink batch processing. And the
> > > ability
> > > > > to
> > > > > > > > > > cache intermediate results of a DataStream is crucial to
> > the
> > > > > > > > > > interactive programming experience.
> > > > > > > > > >
> > > > > > > > > > Therefore, we propose to support caching a DataStream in
> > > Batch
> > > > > > > > > > execution. We believe that users can benefit a lot from
> the
> > > > > change
> > > > > > and
> > > > > > > > > > encourage them to use DataStream API for their
> interactive
> > > batch
> > > > > > > > > > processing work.
> > > > > > > > > >
> > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply
> to
> > > this
> > > > > > email
> > > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > best,
> > > > > Zhipeng
> > > > >
> > >
> >
>
>

Re: Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi, 

Thanks Xuannan for the clarification, I also have no other issues~

Best,
Yun



 ------------------Original Mail ------------------
Sender:Xuannan Su <su...@gmail.com>
Send Date:Wed Jan 19 11:35:13 2022
Recipients:Flink Dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing
Hi devs,



Thank you all for the discussion.

If there are no objections or feedback, I would like to start the vote

thread tomorrow.



Best,

Xuannan



On Tue, Jan 18, 2022 at 8:12 PM Xuannan Su  wrote:

>

> Hi Yun,

>

> Thanks for your questions.

>

> 1. I think the execute_and_collect is an API on the DataStream, which

> adds a collect sink to the DataStream and invokes

> StreamExecutionEnvironment#execute. It is a convenient method to

> execute a job and get an iterator of the result.

>

> 2. As our offline discussion, there are two ways to re-compute the

> missing cache intermediate result.

>

> In the current design, the re-submission of the job happens on the

> client-side. We can throw a non-recoverable exception, annotated by

> `ThrowableType#NonRecoverableError`, to bypass the failover strategy

> when we found that the cache is missing. When the client catches the

> error, it can submit the original job to re-compute the intermediate

> result.

>

> The re-submission process of the job can happen at the scheduler. This

> way, the cache-consuming job has to contains the vertex that creates

> the cache. If the scheduler finds that the cache intermediate result

> exists, it skips the cache creating vertices. If the cache consuming

> vertex finds out the cache intermediate result is missing, the

> scheduler restarts the cache creating vertices.

>

> Handling the missing cache at the scheduler requires a lot more work

> on the scheduler, compared to re-submit the job at the client side.

> Thus, for this FLIP, we will choose the first method. When the

> scheduler is ready, we can make it work with the scheduler. And the

> process should be transparent to the user.

>

> Best,

> Xuannan

>

>

> On Mon, Jan 17, 2022 at 2:07 PM Yun Gao  wrote:

> >

> > Hi Xuannan,

> >

> > Very thanks for the detailed explanation and sorry for the very

> > late response.

> >

> > For cached result partition v.s. managed table, I also agree with

> > the current conclusion that they could be differentiated at the moment:

> > cached result partition could be viewed as an internal, lightweight data

> > cache whose lifecycle is bound to the current application, and managed

> > table could be viewed as an external service whose lifecycle could be

> > across multiple applications.

> >

> > For the other issues, after more thoughts I currently have two remaining

> > issues:

> >

> > 1. Regarding the api, I think it should work if we could execute multiple

> > caches as a whole, but from the FLIP, currently in the example it seems

> > we are calling execute_and_collect() on top of a single CachedDataStream?

> > Also in the give API CachedDataStream does not seem to have a method

> > execute_and_collect() ?

> > 2. For re-submitting the job when the cached result partition is missing, would

> > this happen in the client side or in the scheduler? If this happens in the client

> > side, we need to bypass the give failover strategy (like attempting for N times)

> > when we found the cache result partition is missed?

> >

> > Best,

> > Yun

> >

> >

> >

> > ------------------------------------------------------------------

> > From:Xuannan Su 

> > Send Time:2022 Jan. 17 (Mon.) 13:00

> > To:dev 

> > Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

> >

> > Hi David,

> >

> > Thanks for pointing out the FLIP-187. After reading the FLIP, I think it

> > can solve the problem of choosing the proper parallelism, and thus it

> > should be fine to not provide the method to set the parallelism of the

> > cache.

> >

> > And you understand of the outcome of this FLIP is correct.

> >

> > If there are no more feedback and objections, I would like to start a vote

> > thread tomorrow.

> >

> > Best,

> > Xuannan

> >

> > On Fri, Jan 14, 2022 at 5:34 PM David Morávek  wrote:

> >

> > > Hi Xuannan,

> > >

> > > I think this already looks really good. The whole discussions is pretty

> > > long, so I'll just to summarize my current understanding of the outcome:

> > >

> > > - This only aims on the DataStream API for now, but can be used as a

> > > building block for the higher level abstractions (Table API).

> > > - We're pushing caching down to the shuffle service (works with all

> > > implementations), storing the intermediate results. This should also

> > > naturally work with current fail-over mechanisms for batch (backtrack +

> > > recompute missing intermediate results [1]).

> > >

> > >

> > > > For setting the parallelism of the CacheTransformation. With the

> > > > current design, the parallelism of the cache intermediate result is

> > > > determined by the parallelism of the transformation that produces the

> > > > intermediate result to cache. Thus, the parallelism of the caching

> > > > transformation is set by the parallelism of the transformation to be

> > > > cached. I think the overhead should not be critical as the

> > > > cache-producing job suffers from the same overhead anyway. For

> > > > CacheTransformation with large parallelism but the result dataset is

> > > > relatively small, I think we should reduce the parallelism of the

> > > > transformation to cache.

> > >

> > >

> > > Is the whole "choosing the right parallelism for caching" problem solved by

> > > the Adaptive Batch Job Scheduler [2]?

> > >

> > > [1]

> > >

> > > https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html

> > > [2]

> > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler

> > >

> > > Best,

> > > D.

> > >

> > > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su  wrote:

> > >

> > > > Hi Gen,

> > > >

> > > > Thanks for your feedback.

> > > >

> > > > I think you are talking about how we are going to store the caching

> > > > data. The first option is to write the data with a sink to an external

> > > > file system, much like the file store of the Dynamic Table. If I

> > > > understand correctly, it requires a distributed file system, e.g HDSF,

> > > > s3, etc. In my opinion, it is too heavyweight to use a distributed

> > > > file system for caching.

> > > >

> > > > As you said, using the shuffle service for caching is quite natural as

> > > > we need to produce the intermediate result anyway. For Table/SQL API,

> > > > the table operations are translated to transformations, where we can

> > > > reuse the CacheTransformation. It should not be unfriendly for

> > > > Table/SQL API.

> > > >

> > > > For setting the parallelism of the CacheTransformation. With the

> > > > current design, the parallelism of the cache intermediate result is

> > > > determined by the parallelism of the transformation that produces the

> > > > intermediate result to cache. Thus, the parallelism of the caching

> > > > transformation is set by the parallelism of the transformation to be

> > > > cached. I think the overhead should not be critical as the

> > > > cache-producing job suffers from the same overhead anyway. For

> > > > CacheTransformation with large parallelism but the result dataset is

> > > > relatively small, I think we should reduce the parallelism of the

> > > > transformation to cache.

> > > >

> > > > Best,

> > > > Xuannan

> > > >

> > > >

> > > >

> > > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo  wrote:

> > > > >

> > > > > Hi Xuannan,

> > > > >

> > > > > Thanks for the reply.

> > > > >

> > > > > I do agree that dynamic tables and cached partitions are similar

> > > features

> > > > > aiming different cases. In my opinion, the main difference of the

> > > > > implementations is to cache only the data or the whole result

> > > partition.

> > > > >

> > > > > To cache only the data, we can translate the CacheTransformation to a

> > > > Sink

> > > > > node for writing, and Source node for consuming. Most of the things are

> > > > > just the same as this FLIP, except for the storage, which is an

> > > external

> > > > > one (or a built-in one if we can use the dynamic table storage),

> > > instead

> > > > of

> > > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.

> > > This

> > > > > can make caching independent from a specific shuffle service, and make

> > > it

> > > > > possible to share data between different jobs / Per-Job mode jobs.

> > > > >

> > > > > Caching the whole partition is natural in DataStream API, since the

> > > > > partition is a low-level concept, and data storage is already provided

> > > by

> > > > > the default shuffle service. So if we want to choose a solution only to

> > > > > support cache in DataStream API, caching the whole partition can be a

> > > > good

> > > > > choice. But this may be not as friendly to Table/SQL API as to

> > > > > DataStream, since users are announcing to cache a logical Table (view),

> > > > > rather than a physical partition. If we want a unified solution for

> > > both

> > > > > APIs, this may need to be considered.

> > > > >

> > > > >

> > > > > And here's another suggestion to this FLIP. Maybe we should support

> > > > > "setParallelism" in CacheTransformation, for both caching and

> > > consuming.

> > > > >

> > > > > In some cases, the input parallelism of the CacheTransformation is

> > > large

> > > > > but the result dataset is relatively small. We may need too many

> > > > resources

> > > > > to consume the result partition if the source parallelism has to be the

> > > > > same with the producer.

> > > > >

> > > > > On the other hand, serving a large number of partitions may also have

> > > > more

> > > > > overhead. Though maybe it's not a big burban, we can try to reduce the

> > > > > actual cached partition count if necessary, for example by adding a

> > > > > pass-through vertex with the specific parallelism between the producer

> > > > and

> > > > > the cache vertices.

> > > > >

> > > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang 
> > > >

> > > > > wrote:

> > > > >

> > > > > > Hi Xuannnan,

> > > > > >

> > > > > > Thanks for the reply.

> > > > > >

> > > > > > Regarding whether and how to support cache sideoutput, I agree that

> > > the

> > > > > > second option might be better if there do exist a use case that users

> > > > need

> > > > > > to cache only some certain side outputs.

> > > > > >

> > > > > >

> > > > > > Xuannan Su  于2022年1月4日周二 15:50写道:

> > > > > >

> > > > > > > Hi Zhipeng and Gen,

> > > > > > >

> > > > > > > Thanks for joining the discussion.

> > > > > > >

> > > > > > > For Zhipeng:

> > > > > > >

> > > > > > > - Can we support side output

> > > > > > > Caching the side output is indeed a valid use case. However, with

> > > the

> > > > > > > current API, it is not straightforward to cache the side output.

> > > You

> > > > > > > can apply an identity map function to the DataStream returned by

> > > the

> > > > > > > getSideOutput method and then cache the result of the map

> > > > > > > transformation. In my opinion, it is not user-friendly. Therefore,

> > > we

> > > > > > > should think of a way to better support the use case.

> > > > > > > As you say, we can introduce a new class

> > > > > > > `CachedSingleOutputStreamOperator`, and overwrite the

> > > `getSideOutput`

> > > > > > > method to return a `CachedDatastream`. With this approach, the

> > > cache

> > > > > > > method implies that both output and the side output of the

> > > > > > > `SingleOutputStreamOperatior` are cached. The problem with this

> > > > > > > approach is that the user has no control over which side output

> > > > should

> > > > > > > be cached.

> > > > > > > Another option would be to let the `getSideOuput` method return the

> > > > > > > `SingleOutputStreamOperator`. This way, users can decide which side

> > > > > > > output to cache. As the `getSideOutput` method returns a

> > > > > > > `SingleOutputStreamOperator`. Users can set properties of the

> > > > > > > transformation that produce the side output, e.g. parallelism,

> > > buffer

> > > > > > > timeout, etc. If users try to set different values of the same

> > > > > > > property of a transformation, an exception will be thrown. What do

> > > > you

> > > > > > > think?

> > > > > > >

> > > > > > > - Can we support Stream Mode

> > > > > > > Running a job in stream mode doesn't guarantee the job will finish,

> > > > > > > while in batch mode, it does. This is the main reason that

> > > prevents

> > > > > > > us from supporting cache in stream mode. The cache cannot be used

> > > > > > > unless the job can finish.

> > > > > > > If I understand correctly, by "run batch jobs in Stream Mode", you

> > > > > > > mean that you have a job with all bounded sources, but you want the

> > > > > > > intermediate data to shuffle in pipelined mode instead of blocking

> > > > > > > mode. If that is the case, the job can run in batch mode with

> > > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"

> > > [1].

> > > > > > > And we can support caching in this case.

> > > > > > >

> > > > > > > - Change parallelism of CachedDataStream

> > > > > > > CachedDataStream extends from DataStream, which doesn't have the

> > > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.

> > > Thus,

> > > > > > > it should not be a problem with CachedDataStream.

> > > > > > >

> > > > > > > For Gen:

> > > > > > >

> > > > > > > - Relation between FLIP-205 and FLIP-188

> > > > > > > Although it feels like dynamic table and caching are similar in the

> > > > > > > sense that they let user reuse come intermediate result, they

> > > target

> > > > > > > different use cases. The dynamic table is targeting the use case

> > > > where

> > > > > > > users want to share a dynamic updating intermediate result across

> > > > > > > multiple applications. It is some meaningful data that can be

> > > > consumed

> > > > > > > by different Flink applications and Flink jobs. While caching is

> > > > > > > targeting the use case where users know that all the sources are

> > > > > > > bounded and static, and caching is only used to avoid re-computing

> > > > the

> > > > > > > intermediate result. And the cached intermediate result is only

> > > > > > > meaningful crossing jobs in the same application.

> > > > > > >

> > > > > > > Dynamic table and caching can be used together. For example, in a

> > > > > > > machine learning scenario, we can have a Stream job that is

> > > > generating

> > > > > > > some training samples. And we can create a dynamic table for the

> > > > > > > training sample. And we run a Flink application every hour to do

> > > some

> > > > > > > data analysis on the training sample generated in the last hour.

> > > The

> > > > > > > Flink application consists of multiple batch jobs and the batch

> > > jobs

> > > > > > > share some intermediate results, so users can use cache to avoid

> > > > > > > re-computation. The intermediate result is not meaningful outside

> > > of

> > > > > > > the application. And the cache will be discarded after the

> > > > application

> > > > > > > is finished.

> > > > > > >

> > > > > > > [1]

> > > > > > >

> > > > > >

> > > >

> > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode

> > > > > > >

> > > > > > >

> > > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo 

> > > wrote:

> > > > > > > >

> > > > > > > > Hi Xuannan,

> > > > > > > >

> > > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in

> > > dynamic

> > > > > > table

> > > > > > > > storage, which provides a unified changelog & table

> > > representation.

> > > > > > > Tables

> > > > > > > > stored there can be used in further ad-hoc queries. To my

> > > > > > understanding,

> > > > > > > > it's quite like an implementation of caching in Table API, and

> > > the

> > > > > > ad-hoc

> > > > > > > > queries are somehow like further steps in an interactive program.

> > > > > > > >

> > > > > > > > As you replied, caching at Table/SQL API is the next step, as a

> > > > part of

> > > > > > > > interactive programming in Table API, which we all agree is the

> > > > major

> > > > > > > > scenario. What do you think about the relation between it and

> > > > FLIP-188?

> > > > > > > >

> > > > > > > > [1]

> > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage

> > > > > > > >

> > > > > > > >

> > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <

> > > suxuannan95@gmail.com>

> > > > > > > wrote:

> > > > > > > >

> > > > > > > > > Hi David,

> > > > > > > > >

> > > > > > > > > Thanks for sharing your thoughts.

> > > > > > > > >

> > > > > > > > > You are right that most people tend to use high-level API for

> > > > > > > > > interactive data exploration. Actually, there is

> > > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far

> > > > as I

> > > > > > > > > know, it has been accepted but hasn’t been implemented. At the

> > > > time

> > > > > > > > > when it is drafted, DataStream did not support Batch mode but

> > > > Table

> > > > > > > > > API does.

> > > > > > > > >

> > > > > > > > > Now that the DataStream API does support batch processing, I

> > > > think we

> > > > > > > > > can focus on supporting cache at DataStream first. It is still

> > > > > > > > > valuable for DataStream users and most of the work we do in

> > > this

> > > > FLIP

> > > > > > > > > can be reused. So I want to limit the scope of this FLIP.

> > > > > > > > >

> > > > > > > > > After caching is supported at DataStream, we can continue from

> > > > where

> > > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might

> > > > have

> > > > > > to

> > > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?

> > > > > > > > >

> > > > > > > > > Best,

> > > > > > > > > Xuannan

> > > > > > > > >

> > > > > > > > > [1]

> > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

> > > > > > > > >

> > > > > > > > >

> > > > > > > > >

> > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek 
> > > >

> > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > Hi Xuannan,

> > > > > > > > > >

> > > > > > > > > > thanks for drafting this FLIP.

> > > > > > > > > >

> > > > > > > > > > One immediate thought, from what I've seen for interactive

> > > data

> > > > > > > > > exploration

> > > > > > > > > > with Spark, most people tend to use the higher level APIs,

> > > that

> > > > > > > allow for

> > > > > > > > > > faster prototyping (Table API in Flink's case). Should the

> > > > Table

> > > > > > API

> > > > > > > also

> > > > > > > > > > be covered by this FLIP?

> > > > > > > > > >

> > > > > > > > > > Best,

> > > > > > > > > > D.

> > > > > > > > > >

> > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <

> > > > suxuannan95@gmail.com

> > > > > > >

> > > > > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > > Hi devs,

> > > > > > > > > > >

> > > > > > > > > > > I’d like to start a discussion about adding support to

> > > cache

> > > > the

> > > > > > > > > > > intermediate result at DataStream API for batch processing.

> > > > > > > > > > >

> > > > > > > > > > > As the DataStream API now supports batch execution mode, we

> > > > see

> > > > > > > users

> > > > > > > > > > > using the DataStream API to run batch jobs. Interactive

> > > > > > > programming is

> > > > > > > > > > > an important use case of Flink batch processing. And the

> > > > ability

> > > > > > to

> > > > > > > > > > > cache intermediate results of a DataStream is crucial to

> > > the

> > > > > > > > > > > interactive programming experience.

> > > > > > > > > > >

> > > > > > > > > > > Therefore, we propose to support caching a DataStream in

> > > > Batch

> > > > > > > > > > > execution. We believe that users can benefit a lot from the

> > > > > > change

> > > > > > > and

> > > > > > > > > > > encourage them to use DataStream API for their interactive

> > > > batch

> > > > > > > > > > > processing work.

> > > > > > > > > > >

> > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to

> > > > this

> > > > > > > email

> > > > > > > > > > > thread. Looking forward to your feedback!

> > > > > > > > > > >

> > > > > > > > > > > [1]

> > > > > > > > > > >

> > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing

> > > > > > > > > > >

> > > > > > > > > > > Best,

> > > > > > > > > > > Xuannan

> > > > > > > > > > >

> > > > > > > > >

> > > > > > >

> > > > > >

> > > > > >

> > > > > > --

> > > > > > best,

> > > > > > Zhipeng

> > > > > >

> > > >

> > >

> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi devs,

Thank you all for the discussion.
If there are no objections or feedback, I would like to start the vote
thread tomorrow.

Best,
Xuannan

On Tue, Jan 18, 2022 at 8:12 PM Xuannan Su <su...@gmail.com> wrote:
>
> Hi Yun,
>
> Thanks for your questions.
>
> 1. I think the execute_and_collect is an API on the DataStream, which
> adds a collect sink to the DataStream and invokes
> StreamExecutionEnvironment#execute. It is a convenient method to
> execute a job and get an iterator of the result.
>
> 2. As our offline discussion, there are two ways to re-compute the
> missing cache intermediate result.
>
> In the current design, the re-submission of the job happens on the
> client-side. We can throw a non-recoverable exception, annotated by
> `ThrowableType#NonRecoverableError`, to bypass the failover strategy
> when we found that the cache is missing. When the client catches the
> error, it can submit the original job to re-compute the intermediate
> result.
>
> The re-submission process of the job can happen at the scheduler. This
> way, the cache-consuming job has to contains the vertex that creates
> the cache. If the scheduler finds that the cache intermediate result
> exists, it skips the cache creating vertices. If the cache consuming
> vertex finds out the cache intermediate result is missing, the
> scheduler restarts the cache creating vertices.
>
> Handling the missing cache at the scheduler requires a lot more work
> on the scheduler, compared to re-submit the job at the client side.
> Thus, for this FLIP, we will choose the first method. When the
> scheduler is ready, we can make it work with the scheduler. And the
> process should be transparent to the user.
>
> Best,
> Xuannan
>
>
> On Mon, Jan 17, 2022 at 2:07 PM Yun Gao <yu...@aliyun.com.invalid> wrote:
> >
> > Hi Xuannan,
> >
> > Very thanks for the detailed explanation and sorry for the very
> > late response.
> >
> > For cached result partition v.s. managed table, I also agree with
> > the current conclusion that they could be differentiated at the moment:
> > cached result partition could be viewed as an internal, lightweight data
> > cache whose lifecycle is bound to the current application, and managed
> > table could be viewed as an external service whose lifecycle could be
> > across multiple applications.
> >
> > For the other issues, after more thoughts I currently have two remaining
> > issues:
> >
> > 1. Regarding the api, I think it should work if we could execute multiple
> > caches as a whole, but from the FLIP, currently in the example it seems
> > we are calling execute_and_collect() on top of a single CachedDataStream?
> > Also in the give API CachedDataStream does not seem to have a method
> > execute_and_collect() ?
> > 2. For re-submitting the job when the cached result partition is missing, would
> > this happen in the client side or in the scheduler? If this happens in the client
> > side, we need to bypass the give failover strategy (like attempting for N times)
> > when we found the cache result partition is missed?
> >
> > Best,
> > Yun
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Xuannan Su <su...@gmail.com>
> > Send Time:2022 Jan. 17 (Mon.) 13:00
> > To:dev <de...@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing
> >
> > Hi David,
> >
> > Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
> > can solve the problem of choosing the proper parallelism, and thus it
> > should be fine to not provide the method to set the parallelism of the
> > cache.
> >
> > And you understand of the outcome of this FLIP is correct.
> >
> > If there are no more feedback and objections, I would like to start a vote
> > thread tomorrow.
> >
> > Best,
> > Xuannan
> >
> > On Fri, Jan 14, 2022 at 5:34 PM David Morávek <dm...@apache.org> wrote:
> >
> > > Hi Xuannan,
> > >
> > > I think this already looks really good. The whole discussions is pretty
> > > long, so I'll just to summarize my current understanding of the outcome:
> > >
> > > - This only aims on the DataStream API for now, but can be used as a
> > > building block for the higher level abstractions (Table API).
> > > - We're pushing caching down to the shuffle service (works with all
> > > implementations), storing the intermediate results. This should also
> > > naturally work with current fail-over mechanisms for batch (backtrack +
> > > recompute missing intermediate results [1]).
> > >
> > >
> > > > For setting the parallelism of the CacheTransformation. With the
> > > > current design, the parallelism of the cache intermediate result is
> > > > determined by the parallelism of the transformation that produces the
> > > > intermediate result to cache. Thus, the parallelism of the caching
> > > > transformation is set by the parallelism of the transformation to be
> > > > cached. I think the overhead should not be critical as the
> > > > cache-producing job suffers from the same overhead anyway. For
> > > > CacheTransformation with large parallelism but the result dataset is
> > > > relatively small, I think we should reduce the parallelism of the
> > > > transformation to cache.
> > >
> > >
> > > Is the whole "choosing the right parallelism for caching" problem solved by
> > > the Adaptive Batch Job Scheduler [2]?
> > >
> > > [1]
> > >
> > > https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> > > [2]
> > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> > >
> > > Best,
> > > D.
> > >
> > > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com> wrote:
> > >
> > > > Hi Gen,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I think you are talking about how we are going to store the caching
> > > > data. The first option is to write the data with a sink to an external
> > > > file system, much like the file store of the Dynamic Table. If I
> > > > understand correctly, it requires a distributed file system, e.g HDSF,
> > > > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > > > file system for caching.
> > > >
> > > > As you said, using the shuffle service for caching is quite natural as
> > > > we need to produce the intermediate result anyway. For Table/SQL API,
> > > > the table operations are translated to transformations, where we can
> > > > reuse the CacheTransformation. It should not be unfriendly for
> > > > Table/SQL API.
> > > >
> > > > For setting the parallelism of the CacheTransformation. With the
> > > > current design, the parallelism of the cache intermediate result is
> > > > determined by the parallelism of the transformation that produces the
> > > > intermediate result to cache. Thus, the parallelism of the caching
> > > > transformation is set by the parallelism of the transformation to be
> > > > cached. I think the overhead should not be critical as the
> > > > cache-producing job suffers from the same overhead anyway. For
> > > > CacheTransformation with large parallelism but the result dataset is
> > > > relatively small, I think we should reduce the parallelism of the
> > > > transformation to cache.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > >
> > > >
> > > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > I do agree that dynamic tables and cached partitions are similar
> > > features
> > > > > aiming different cases.  In my opinion, the main difference of the
> > > > > implementations is to cache only the data or the whole result
> > > partition.
> > > > >
> > > > > To cache only the data, we can translate the CacheTransformation to a
> > > > Sink
> > > > > node for writing, and Source node for consuming. Most of the things are
> > > > > just the same as this FLIP, except for the storage, which is an
> > > external
> > > > > one (or a built-in one if we can use the dynamic table storage),
> > > instead
> > > > of
> > > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> > > This
> > > > > can make caching independent from a specific shuffle service, and make
> > > it
> > > > > possible to share data between different jobs / Per-Job mode jobs.
> > > > >
> > > > > Caching the whole partition is natural in DataStream API, since the
> > > > > partition is a low-level concept, and data storage is already provided
> > > by
> > > > > the default shuffle service. So if we want to choose a solution only to
> > > > > support cache in DataStream API, caching the whole partition can be a
> > > > good
> > > > > choice. But this may be not as friendly to Table/SQL API as to
> > > > > DataStream, since users are announcing to cache a logical Table (view),
> > > > > rather than a physical partition. If we want a unified solution for
> > > both
> > > > > APIs, this may need to be considered.
> > > > >
> > > > >
> > > > > And here's another suggestion to this FLIP. Maybe we should support
> > > > > "setParallelism" in CacheTransformation, for both caching and
> > > consuming.
> > > > >
> > > > > In some cases, the input parallelism of the CacheTransformation is
> > > large
> > > > > but the result dataset is relatively small. We may need too many
> > > > resources
> > > > > to consume the result partition if the source parallelism has to be the
> > > > > same with the producer.
> > > > >
> > > > > On the other hand, serving a large number of partitions may also have
> > > > more
> > > > > overhead. Though maybe it's not a big burban, we can try to reduce the
> > > > > actual cached partition count if necessary, for example by adding a
> > > > > pass-through vertex with the specific parallelism between the producer
> > > > and
> > > > > the cache vertices.
> > > > >
> > > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipengw@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Xuannnan,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > Regarding whether and how to support cache sideoutput, I agree that
> > > the
> > > > > > second option might be better if there do exist a use case that users
> > > > need
> > > > > > to cache only some certain side outputs.
> > > > > >
> > > > > >
> > > > > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > > > >
> > > > > > > Hi Zhipeng and Gen,
> > > > > > >
> > > > > > > Thanks for joining the discussion.
> > > > > > >
> > > > > > > For Zhipeng:
> > > > > > >
> > > > > > > - Can we support side output
> > > > > > > Caching the side output is indeed a valid use case. However, with
> > > the
> > > > > > > current API, it is not straightforward to cache the side output.
> > > You
> > > > > > > can apply an identity map function to the DataStream returned by
> > > the
> > > > > > > getSideOutput method and then cache the result of the map
> > > > > > > transformation. In my opinion, it is not user-friendly. Therefore,
> > > we
> > > > > > > should think of a way to better support the use case.
> > > > > > > As you say, we can introduce a new class
> > > > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> > > `getSideOutput`
> > > > > > > method to return a `CachedDatastream`. With this approach, the
> > > cache
> > > > > > > method implies that both output and the side output of the
> > > > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > > > approach is that the user has no control over which side output
> > > > should
> > > > > > > be cached.
> > > > > > > Another option would be to let the `getSideOuput` method return the
> > > > > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > > > > output to cache. As the `getSideOutput` method returns a
> > > > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > > > transformation that produce the side output, e.g. parallelism,
> > > buffer
> > > > > > > timeout, etc. If users try to set different values of the same
> > > > > > > property of a transformation, an exception will be thrown. What do
> > > > you
> > > > > > > think?
> > > > > > >
> > > > > > > - Can we support Stream Mode
> > > > > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > > > > while in batch mode, it does.  This is the main reason that
> > > prevents
> > > > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > > > unless the job can finish.
> > > > > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > > > > mean that you have a job with all bounded sources, but you want the
> > > > > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > > > > mode. If that is the case, the job can run in batch mode with
> > > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> > > [1].
> > > > > > > And we can support caching in this case.
> > > > > > >
> > > > > > > - Change parallelism of CachedDataStream
> > > > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> > > Thus,
> > > > > > > it should not be a problem with CachedDataStream.
> > > > > > >
> > > > > > > For Gen:
> > > > > > >
> > > > > > > - Relation between FLIP-205 and FLIP-188
> > > > > > > Although it feels like dynamic table and caching are similar in the
> > > > > > > sense that they let user reuse come intermediate result, they
> > > target
> > > > > > > different use cases. The dynamic table is targeting the use case
> > > > where
> > > > > > > users want to share a dynamic updating intermediate result across
> > > > > > > multiple applications. It is some meaningful data that can be
> > > > consumed
> > > > > > > by different Flink applications and Flink jobs. While caching is
> > > > > > > targeting the use case where users know that all the sources are
> > > > > > > bounded and static, and caching is only used to avoid re-computing
> > > > the
> > > > > > > intermediate result. And the cached intermediate result is only
> > > > > > > meaningful crossing jobs in the same application.
> > > > > > >
> > > > > > > Dynamic table and caching can be used together. For example, in a
> > > > > > > machine learning scenario, we can have a Stream job that is
> > > > generating
> > > > > > > some training samples. And we can create a dynamic table for the
> > > > > > > training sample. And we run a Flink application every hour to do
> > > some
> > > > > > > data analysis on the training sample generated in the last hour.
> > > The
> > > > > > > Flink application consists of multiple batch jobs and the batch
> > > jobs
> > > > > > > share some intermediate results, so users can use cache to avoid
> > > > > > > re-computation. The intermediate result is not meaningful outside
> > > of
> > > > > > > the application. And the cache will be discarded after the
> > > > application
> > > > > > > is finished.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> > > dynamic
> > > > > > table
> > > > > > > > storage, which provides a unified changelog & table
> > > representation.
> > > > > > > Tables
> > > > > > > > stored there can be used in further ad-hoc queries. To my
> > > > > > understanding,
> > > > > > > > it's quite like an implementation of caching in Table API, and
> > > the
> > > > > > ad-hoc
> > > > > > > > queries are somehow like further steps in an interactive program.
> > > > > > > >
> > > > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > > > part of
> > > > > > > > interactive programming in Table API, which we all agree is the
> > > > major
> > > > > > > > scenario. What do you think about the relation between it and
> > > > FLIP-188?
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> > > suxuannan95@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi David,
> > > > > > > > >
> > > > > > > > > Thanks for sharing your thoughts.
> > > > > > > > >
> > > > > > > > > You are right that most people tend to use high-level API for
> > > > > > > > > interactive data exploration. Actually, there is
> > > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> > > > as I
> > > > > > > > > know, it has been accepted but hasn’t been implemented. At the
> > > > time
> > > > > > > > > when it is drafted, DataStream did not support Batch mode but
> > > > Table
> > > > > > > > > API does.
> > > > > > > > >
> > > > > > > > > Now that the DataStream API does support batch processing, I
> > > > think we
> > > > > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > > > > valuable for DataStream users and most of the work we do in
> > > this
> > > > FLIP
> > > > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > > > >
> > > > > > > > > After caching is supported at DataStream, we can continue from
> > > > where
> > > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> > > > have
> > > > > > to
> > > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Xuannan
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dmvk@apache.org
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > thanks for drafting this FLIP.
> > > > > > > > > >
> > > > > > > > > > One immediate thought, from what I've seen for interactive
> > > data
> > > > > > > > > exploration
> > > > > > > > > > with Spark, most people tend to use the higher level APIs,
> > > that
> > > > > > > allow for
> > > > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > > > Table
> > > > > > API
> > > > > > > also
> > > > > > > > > > be covered by this FLIP?
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > D.
> > > > > > > > > >
> > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > > > suxuannan95@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi devs,
> > > > > > > > > > >
> > > > > > > > > > > I’d like to start a discussion about adding support to
> > > cache
> > > > the
> > > > > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > > > > >
> > > > > > > > > > > As the DataStream API now supports batch execution mode, we
> > > > see
> > > > > > > users
> > > > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > > > programming is
> > > > > > > > > > > an important use case of Flink batch processing. And the
> > > > ability
> > > > > > to
> > > > > > > > > > > cache intermediate results of a DataStream is crucial to
> > > the
> > > > > > > > > > > interactive programming experience.
> > > > > > > > > > >
> > > > > > > > > > > Therefore, we propose to support caching a DataStream in
> > > > Batch
> > > > > > > > > > > execution. We believe that users can benefit a lot from the
> > > > > > change
> > > > > > > and
> > > > > > > > > > > encourage them to use DataStream API for their interactive
> > > > batch
> > > > > > > > > > > processing work.
> > > > > > > > > > >
> > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> > > > this
> > > > > > > email
> > > > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Xuannan
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > best,
> > > > > > Zhipeng
> > > > > >
> > > >
> > >
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi Yun,

Thanks for your questions.

1. I think the execute_and_collect is an API on the DataStream, which
adds a collect sink to the DataStream and invokes
StreamExecutionEnvironment#execute. It is a convenient method to
execute a job and get an iterator of the result.

2. As our offline discussion, there are two ways to re-compute the
missing cache intermediate result.

In the current design, the re-submission of the job happens on the
client-side. We can throw a non-recoverable exception, annotated by
`ThrowableType#NonRecoverableError`, to bypass the failover strategy
when we found that the cache is missing. When the client catches the
error, it can submit the original job to re-compute the intermediate
result.

The re-submission process of the job can happen at the scheduler. This
way, the cache-consuming job has to contains the vertex that creates
the cache. If the scheduler finds that the cache intermediate result
exists, it skips the cache creating vertices. If the cache consuming
vertex finds out the cache intermediate result is missing, the
scheduler restarts the cache creating vertices.

Handling the missing cache at the scheduler requires a lot more work
on the scheduler, compared to re-submit the job at the client side.
Thus, for this FLIP, we will choose the first method. When the
scheduler is ready, we can make it work with the scheduler. And the
process should be transparent to the user.

Best,
Xuannan


On Mon, Jan 17, 2022 at 2:07 PM Yun Gao <yu...@aliyun.com.invalid> wrote:
>
> Hi Xuannan,
>
> Very thanks for the detailed explanation and sorry for the very
> late response.
>
> For cached result partition v.s. managed table, I also agree with
> the current conclusion that they could be differentiated at the moment:
> cached result partition could be viewed as an internal, lightweight data
> cache whose lifecycle is bound to the current application, and managed
> table could be viewed as an external service whose lifecycle could be
> across multiple applications.
>
> For the other issues, after more thoughts I currently have two remaining
> issues:
>
> 1. Regarding the api, I think it should work if we could execute multiple
> caches as a whole, but from the FLIP, currently in the example it seems
> we are calling execute_and_collect() on top of a single CachedDataStream?
> Also in the give API CachedDataStream does not seem to have a method
> execute_and_collect() ?
> 2. For re-submitting the job when the cached result partition is missing, would
> this happen in the client side or in the scheduler? If this happens in the client
> side, we need to bypass the give failover strategy (like attempting for N times)
> when we found the cache result partition is missed?
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Xuannan Su <su...@gmail.com>
> Send Time:2022 Jan. 17 (Mon.) 13:00
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing
>
> Hi David,
>
> Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
> can solve the problem of choosing the proper parallelism, and thus it
> should be fine to not provide the method to set the parallelism of the
> cache.
>
> And you understand of the outcome of this FLIP is correct.
>
> If there are no more feedback and objections, I would like to start a vote
> thread tomorrow.
>
> Best,
> Xuannan
>
> On Fri, Jan 14, 2022 at 5:34 PM David Morávek <dm...@apache.org> wrote:
>
> > Hi Xuannan,
> >
> > I think this already looks really good. The whole discussions is pretty
> > long, so I'll just to summarize my current understanding of the outcome:
> >
> > - This only aims on the DataStream API for now, but can be used as a
> > building block for the higher level abstractions (Table API).
> > - We're pushing caching down to the shuffle service (works with all
> > implementations), storing the intermediate results. This should also
> > naturally work with current fail-over mechanisms for batch (backtrack +
> > recompute missing intermediate results [1]).
> >
> >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> >
> >
> > Is the whole "choosing the right parallelism for caching" problem solved by
> > the Adaptive Batch Job Scheduler [2]?
> >
> > [1]
> >
> > https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> > [2]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> >
> > Best,
> > D.
> >
> > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com> wrote:
> >
> > > Hi Gen,
> > >
> > > Thanks for your feedback.
> > >
> > > I think you are talking about how we are going to store the caching
> > > data. The first option is to write the data with a sink to an external
> > > file system, much like the file store of the Dynamic Table. If I
> > > understand correctly, it requires a distributed file system, e.g HDSF,
> > > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > > file system for caching.
> > >
> > > As you said, using the shuffle service for caching is quite natural as
> > > we need to produce the intermediate result anyway. For Table/SQL API,
> > > the table operations are translated to transformations, where we can
> > > reuse the CacheTransformation. It should not be unfriendly for
> > > Table/SQL API.
> > >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > >
> > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I do agree that dynamic tables and cached partitions are similar
> > features
> > > > aiming different cases.  In my opinion, the main difference of the
> > > > implementations is to cache only the data or the whole result
> > partition.
> > > >
> > > > To cache only the data, we can translate the CacheTransformation to a
> > > Sink
> > > > node for writing, and Source node for consuming. Most of the things are
> > > > just the same as this FLIP, except for the storage, which is an
> > external
> > > > one (or a built-in one if we can use the dynamic table storage),
> > instead
> > > of
> > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> > This
> > > > can make caching independent from a specific shuffle service, and make
> > it
> > > > possible to share data between different jobs / Per-Job mode jobs.
> > > >
> > > > Caching the whole partition is natural in DataStream API, since the
> > > > partition is a low-level concept, and data storage is already provided
> > by
> > > > the default shuffle service. So if we want to choose a solution only to
> > > > support cache in DataStream API, caching the whole partition can be a
> > > good
> > > > choice. But this may be not as friendly to Table/SQL API as to
> > > > DataStream, since users are announcing to cache a logical Table (view),
> > > > rather than a physical partition. If we want a unified solution for
> > both
> > > > APIs, this may need to be considered.
> > > >
> > > >
> > > > And here's another suggestion to this FLIP. Maybe we should support
> > > > "setParallelism" in CacheTransformation, for both caching and
> > consuming.
> > > >
> > > > In some cases, the input parallelism of the CacheTransformation is
> > large
> > > > but the result dataset is relatively small. We may need too many
> > > resources
> > > > to consume the result partition if the source parallelism has to be the
> > > > same with the producer.
> > > >
> > > > On the other hand, serving a large number of partitions may also have
> > > more
> > > > overhead. Though maybe it's not a big burban, we can try to reduce the
> > > > actual cached partition count if necessary, for example by adding a
> > > > pass-through vertex with the specific parallelism between the producer
> > > and
> > > > the cache vertices.
> > > >
> > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipengw@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Xuannnan,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Regarding whether and how to support cache sideoutput, I agree that
> > the
> > > > > second option might be better if there do exist a use case that users
> > > need
> > > > > to cache only some certain side outputs.
> > > > >
> > > > >
> > > > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > > >
> > > > > > Hi Zhipeng and Gen,
> > > > > >
> > > > > > Thanks for joining the discussion.
> > > > > >
> > > > > > For Zhipeng:
> > > > > >
> > > > > > - Can we support side output
> > > > > > Caching the side output is indeed a valid use case. However, with
> > the
> > > > > > current API, it is not straightforward to cache the side output.
> > You
> > > > > > can apply an identity map function to the DataStream returned by
> > the
> > > > > > getSideOutput method and then cache the result of the map
> > > > > > transformation. In my opinion, it is not user-friendly. Therefore,
> > we
> > > > > > should think of a way to better support the use case.
> > > > > > As you say, we can introduce a new class
> > > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> > `getSideOutput`
> > > > > > method to return a `CachedDatastream`. With this approach, the
> > cache
> > > > > > method implies that both output and the side output of the
> > > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > > approach is that the user has no control over which side output
> > > should
> > > > > > be cached.
> > > > > > Another option would be to let the `getSideOuput` method return the
> > > > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > > > output to cache. As the `getSideOutput` method returns a
> > > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > > transformation that produce the side output, e.g. parallelism,
> > buffer
> > > > > > timeout, etc. If users try to set different values of the same
> > > > > > property of a transformation, an exception will be thrown. What do
> > > you
> > > > > > think?
> > > > > >
> > > > > > - Can we support Stream Mode
> > > > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > > > while in batch mode, it does.  This is the main reason that
> > prevents
> > > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > > unless the job can finish.
> > > > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > > > mean that you have a job with all bounded sources, but you want the
> > > > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > > > mode. If that is the case, the job can run in batch mode with
> > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> > [1].
> > > > > > And we can support caching in this case.
> > > > > >
> > > > > > - Change parallelism of CachedDataStream
> > > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> > Thus,
> > > > > > it should not be a problem with CachedDataStream.
> > > > > >
> > > > > > For Gen:
> > > > > >
> > > > > > - Relation between FLIP-205 and FLIP-188
> > > > > > Although it feels like dynamic table and caching are similar in the
> > > > > > sense that they let user reuse come intermediate result, they
> > target
> > > > > > different use cases. The dynamic table is targeting the use case
> > > where
> > > > > > users want to share a dynamic updating intermediate result across
> > > > > > multiple applications. It is some meaningful data that can be
> > > consumed
> > > > > > by different Flink applications and Flink jobs. While caching is
> > > > > > targeting the use case where users know that all the sources are
> > > > > > bounded and static, and caching is only used to avoid re-computing
> > > the
> > > > > > intermediate result. And the cached intermediate result is only
> > > > > > meaningful crossing jobs in the same application.
> > > > > >
> > > > > > Dynamic table and caching can be used together. For example, in a
> > > > > > machine learning scenario, we can have a Stream job that is
> > > generating
> > > > > > some training samples. And we can create a dynamic table for the
> > > > > > training sample. And we run a Flink application every hour to do
> > some
> > > > > > data analysis on the training sample generated in the last hour.
> > The
> > > > > > Flink application consists of multiple batch jobs and the batch
> > jobs
> > > > > > share some intermediate results, so users can use cache to avoid
> > > > > > re-computation. The intermediate result is not meaningful outside
> > of
> > > > > > the application. And the cache will be discarded after the
> > > application
> > > > > > is finished.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> > dynamic
> > > > > table
> > > > > > > storage, which provides a unified changelog & table
> > representation.
> > > > > > Tables
> > > > > > > stored there can be used in further ad-hoc queries. To my
> > > > > understanding,
> > > > > > > it's quite like an implementation of caching in Table API, and
> > the
> > > > > ad-hoc
> > > > > > > queries are somehow like further steps in an interactive program.
> > > > > > >
> > > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > > part of
> > > > > > > interactive programming in Table API, which we all agree is the
> > > major
> > > > > > > scenario. What do you think about the relation between it and
> > > FLIP-188?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> > suxuannan95@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
> > > > > > > > Thanks for sharing your thoughts.
> > > > > > > >
> > > > > > > > You are right that most people tend to use high-level API for
> > > > > > > > interactive data exploration. Actually, there is
> > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> > > as I
> > > > > > > > know, it has been accepted but hasn’t been implemented. At the
> > > time
> > > > > > > > when it is drafted, DataStream did not support Batch mode but
> > > Table
> > > > > > > > API does.
> > > > > > > >
> > > > > > > > Now that the DataStream API does support batch processing, I
> > > think we
> > > > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > > > valuable for DataStream users and most of the work we do in
> > this
> > > FLIP
> > > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > > >
> > > > > > > > After caching is supported at DataStream, we can continue from
> > > where
> > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> > > have
> > > > > to
> > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dmvk@apache.org
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > thanks for drafting this FLIP.
> > > > > > > > >
> > > > > > > > > One immediate thought, from what I've seen for interactive
> > data
> > > > > > > > exploration
> > > > > > > > > with Spark, most people tend to use the higher level APIs,
> > that
> > > > > > allow for
> > > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > > Table
> > > > > API
> > > > > > also
> > > > > > > > > be covered by this FLIP?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > D.
> > > > > > > > >
> > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > > suxuannan95@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > I’d like to start a discussion about adding support to
> > cache
> > > the
> > > > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > > > >
> > > > > > > > > > As the DataStream API now supports batch execution mode, we
> > > see
> > > > > > users
> > > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > > programming is
> > > > > > > > > > an important use case of Flink batch processing. And the
> > > ability
> > > > > to
> > > > > > > > > > cache intermediate results of a DataStream is crucial to
> > the
> > > > > > > > > > interactive programming experience.
> > > > > > > > > >
> > > > > > > > > > Therefore, we propose to support caching a DataStream in
> > > Batch
> > > > > > > > > > execution. We believe that users can benefit a lot from the
> > > > > change
> > > > > > and
> > > > > > > > > > encourage them to use DataStream API for their interactive
> > > batch
> > > > > > > > > > processing work.
> > > > > > > > > >
> > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> > > this
> > > > > > email
> > > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > best,
> > > > > Zhipeng
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Xuannan,

Very thanks for the detailed explanation and sorry for the very 
late response. 

For cached result partition v.s. managed table, I also agree with 
the current conclusion that they could be differentiated at the moment:
cached result partition could be viewed as an internal, lightweight data 
cache whose lifecycle is bound to the current application, and managed 
table could be viewed as an external service whose lifecycle could be 
across multiple applications.

For the other issues, after more thoughts I currently have two remaining
issues:

1. Regarding the api, I think it should work if we could execute multiple
caches as a whole, but from the FLIP, currently in the example it seems
we are calling execute_and_collect() on top of a single CachedDataStream? 
Also in the give API CachedDataStream does not seem to have a method 
execute_and_collect() ?
2. For re-submitting the job when the cached result partition is missing, would
this happen in the client side or in the scheduler? If this happens in the client
side, we need to bypass the give failover strategy (like attempting for N times)
when we found the cache result partition is missed?

Best,
Yun



------------------------------------------------------------------
From:Xuannan Su <su...@gmail.com>
Send Time:2022 Jan. 17 (Mon.) 13:00
To:dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Hi David,

Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
can solve the problem of choosing the proper parallelism, and thus it
should be fine to not provide the method to set the parallelism of the
cache.

And you understand of the outcome of this FLIP is correct.

If there are no more feedback and objections, I would like to start a vote
thread tomorrow.

Best,
Xuannan

On Fri, Jan 14, 2022 at 5:34 PM David Morávek <dm...@apache.org> wrote:

> Hi Xuannan,
>
> I think this already looks really good. The whole discussions is pretty
> long, so I'll just to summarize my current understanding of the outcome:
>
> - This only aims on the DataStream API for now, but can be used as a
> building block for the higher level abstractions (Table API).
> - We're pushing caching down to the shuffle service (works with all
> implementations), storing the intermediate results. This should also
> naturally work with current fail-over mechanisms for batch (backtrack +
> recompute missing intermediate results [1]).
>
>
> > For setting the parallelism of the CacheTransformation. With the
> > current design, the parallelism of the cache intermediate result is
> > determined by the parallelism of the transformation that produces the
> > intermediate result to cache. Thus, the parallelism of the caching
> > transformation is set by the parallelism of the transformation to be
> > cached. I think the overhead should not be critical as the
> > cache-producing job suffers from the same overhead anyway. For
> > CacheTransformation with large parallelism but the result dataset is
> > relatively small, I think we should reduce the parallelism of the
> > transformation to cache.
>
>
> Is the whole "choosing the right parallelism for caching" problem solved by
> the Adaptive Batch Job Scheduler [2]?
>
> [1]
>
> https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>
> Best,
> D.
>
> On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi Gen,
> >
> > Thanks for your feedback.
> >
> > I think you are talking about how we are going to store the caching
> > data. The first option is to write the data with a sink to an external
> > file system, much like the file store of the Dynamic Table. If I
> > understand correctly, it requires a distributed file system, e.g HDSF,
> > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > file system for caching.
> >
> > As you said, using the shuffle service for caching is quite natural as
> > we need to produce the intermediate result anyway. For Table/SQL API,
> > the table operations are translated to transformations, where we can
> > reuse the CacheTransformation. It should not be unfriendly for
> > Table/SQL API.
> >
> > For setting the parallelism of the CacheTransformation. With the
> > current design, the parallelism of the cache intermediate result is
> > determined by the parallelism of the transformation that produces the
> > intermediate result to cache. Thus, the parallelism of the caching
> > transformation is set by the parallelism of the transformation to be
> > cached. I think the overhead should not be critical as the
> > cache-producing job suffers from the same overhead anyway. For
> > CacheTransformation with large parallelism but the result dataset is
> > relatively small, I think we should reduce the parallelism of the
> > transformation to cache.
> >
> > Best,
> > Xuannan
> >
> >
> >
> > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the reply.
> > >
> > > I do agree that dynamic tables and cached partitions are similar
> features
> > > aiming different cases.  In my opinion, the main difference of the
> > > implementations is to cache only the data or the whole result
> partition.
> > >
> > > To cache only the data, we can translate the CacheTransformation to a
> > Sink
> > > node for writing, and Source node for consuming. Most of the things are
> > > just the same as this FLIP, except for the storage, which is an
> external
> > > one (or a built-in one if we can use the dynamic table storage),
> instead
> > of
> > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> This
> > > can make caching independent from a specific shuffle service, and make
> it
> > > possible to share data between different jobs / Per-Job mode jobs.
> > >
> > > Caching the whole partition is natural in DataStream API, since the
> > > partition is a low-level concept, and data storage is already provided
> by
> > > the default shuffle service. So if we want to choose a solution only to
> > > support cache in DataStream API, caching the whole partition can be a
> > good
> > > choice. But this may be not as friendly to Table/SQL API as to
> > > DataStream, since users are announcing to cache a logical Table (view),
> > > rather than a physical partition. If we want a unified solution for
> both
> > > APIs, this may need to be considered.
> > >
> > >
> > > And here's another suggestion to this FLIP. Maybe we should support
> > > "setParallelism" in CacheTransformation, for both caching and
> consuming.
> > >
> > > In some cases, the input parallelism of the CacheTransformation is
> large
> > > but the result dataset is relatively small. We may need too many
> > resources
> > > to consume the result partition if the source parallelism has to be the
> > > same with the producer.
> > >
> > > On the other hand, serving a large number of partitions may also have
> > more
> > > overhead. Though maybe it's not a big burban, we can try to reduce the
> > > actual cached partition count if necessary, for example by adding a
> > > pass-through vertex with the specific parallelism between the producer
> > and
> > > the cache vertices.
> > >
> > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipengw@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Xuannnan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > Regarding whether and how to support cache sideoutput, I agree that
> the
> > > > second option might be better if there do exist a use case that users
> > need
> > > > to cache only some certain side outputs.
> > > >
> > > >
> > > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > >
> > > > > Hi Zhipeng and Gen,
> > > > >
> > > > > Thanks for joining the discussion.
> > > > >
> > > > > For Zhipeng:
> > > > >
> > > > > - Can we support side output
> > > > > Caching the side output is indeed a valid use case. However, with
> the
> > > > > current API, it is not straightforward to cache the side output.
> You
> > > > > can apply an identity map function to the DataStream returned by
> the
> > > > > getSideOutput method and then cache the result of the map
> > > > > transformation. In my opinion, it is not user-friendly. Therefore,
> we
> > > > > should think of a way to better support the use case.
> > > > > As you say, we can introduce a new class
> > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> `getSideOutput`
> > > > > method to return a `CachedDatastream`. With this approach, the
> cache
> > > > > method implies that both output and the side output of the
> > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > approach is that the user has no control over which side output
> > should
> > > > > be cached.
> > > > > Another option would be to let the `getSideOuput` method return the
> > > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > > output to cache. As the `getSideOutput` method returns a
> > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > transformation that produce the side output, e.g. parallelism,
> buffer
> > > > > timeout, etc. If users try to set different values of the same
> > > > > property of a transformation, an exception will be thrown. What do
> > you
> > > > > think?
> > > > >
> > > > > - Can we support Stream Mode
> > > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > > while in batch mode, it does.  This is the main reason that
> prevents
> > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > unless the job can finish.
> > > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > > mean that you have a job with all bounded sources, but you want the
> > > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > > mode. If that is the case, the job can run in batch mode with
> > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> [1].
> > > > > And we can support caching in this case.
> > > > >
> > > > > - Change parallelism of CachedDataStream
> > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> Thus,
> > > > > it should not be a problem with CachedDataStream.
> > > > >
> > > > > For Gen:
> > > > >
> > > > > - Relation between FLIP-205 and FLIP-188
> > > > > Although it feels like dynamic table and caching are similar in the
> > > > > sense that they let user reuse come intermediate result, they
> target
> > > > > different use cases. The dynamic table is targeting the use case
> > where
> > > > > users want to share a dynamic updating intermediate result across
> > > > > multiple applications. It is some meaningful data that can be
> > consumed
> > > > > by different Flink applications and Flink jobs. While caching is
> > > > > targeting the use case where users know that all the sources are
> > > > > bounded and static, and caching is only used to avoid re-computing
> > the
> > > > > intermediate result. And the cached intermediate result is only
> > > > > meaningful crossing jobs in the same application.
> > > > >
> > > > > Dynamic table and caching can be used together. For example, in a
> > > > > machine learning scenario, we can have a Stream job that is
> > generating
> > > > > some training samples. And we can create a dynamic table for the
> > > > > training sample. And we run a Flink application every hour to do
> some
> > > > > data analysis on the training sample generated in the last hour.
> The
> > > > > Flink application consists of multiple batch jobs and the batch
> jobs
> > > > > share some intermediate results, so users can use cache to avoid
> > > > > re-computation. The intermediate result is not meaningful outside
> of
> > > > > the application. And the cache will be discarded after the
> > application
> > > > > is finished.
> > > > >
> > > > > [1]
> > > > >
> > > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > >
> > > > >
> > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> dynamic
> > > > table
> > > > > > storage, which provides a unified changelog & table
> representation.
> > > > > Tables
> > > > > > stored there can be used in further ad-hoc queries. To my
> > > > understanding,
> > > > > > it's quite like an implementation of caching in Table API, and
> the
> > > > ad-hoc
> > > > > > queries are somehow like further steps in an interactive program.
> > > > > >
> > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > part of
> > > > > > interactive programming in Table API, which we all agree is the
> > major
> > > > > > scenario. What do you think about the relation between it and
> > FLIP-188?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> suxuannan95@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > Thanks for sharing your thoughts.
> > > > > > >
> > > > > > > You are right that most people tend to use high-level API for
> > > > > > > interactive data exploration. Actually, there is
> > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> > as I
> > > > > > > know, it has been accepted but hasn’t been implemented. At the
> > time
> > > > > > > when it is drafted, DataStream did not support Batch mode but
> > Table
> > > > > > > API does.
> > > > > > >
> > > > > > > Now that the DataStream API does support batch processing, I
> > think we
> > > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > > valuable for DataStream users and most of the work we do in
> this
> > FLIP
> > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > >
> > > > > > > After caching is supported at DataStream, we can continue from
> > where
> > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> > have
> > > > to
> > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dmvk@apache.org
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > > thanks for drafting this FLIP.
> > > > > > > >
> > > > > > > > One immediate thought, from what I've seen for interactive
> data
> > > > > > > exploration
> > > > > > > > with Spark, most people tend to use the higher level APIs,
> that
> > > > > allow for
> > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > Table
> > > > API
> > > > > also
> > > > > > > > be covered by this FLIP?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > D.
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > suxuannan95@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi devs,
> > > > > > > > >
> > > > > > > > > I’d like to start a discussion about adding support to
> cache
> > the
> > > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > > >
> > > > > > > > > As the DataStream API now supports batch execution mode, we
> > see
> > > > > users
> > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > programming is
> > > > > > > > > an important use case of Flink batch processing. And the
> > ability
> > > > to
> > > > > > > > > cache intermediate results of a DataStream is crucial to
> the
> > > > > > > > > interactive programming experience.
> > > > > > > > >
> > > > > > > > > Therefore, we propose to support caching a DataStream in
> > Batch
> > > > > > > > > execution. We believe that users can benefit a lot from the
> > > > change
> > > > > and
> > > > > > > > > encourage them to use DataStream API for their interactive
> > batch
> > > > > > > > > processing work.
> > > > > > > > >
> > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> > this
> > > > > email
> > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Xuannan
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > best,
> > > > Zhipeng
> > > >
> >
>


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi David,

Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
can solve the problem of choosing the proper parallelism, and thus it
should be fine to not provide the method to set the parallelism of the
cache.

And you understand of the outcome of this FLIP is correct.

If there are no more feedback and objections, I would like to start a vote
thread tomorrow.

Best,
Xuannan

On Fri, Jan 14, 2022 at 5:34 PM David Morávek <dm...@apache.org> wrote:

> Hi Xuannan,
>
> I think this already looks really good. The whole discussions is pretty
> long, so I'll just to summarize my current understanding of the outcome:
>
> - This only aims on the DataStream API for now, but can be used as a
> building block for the higher level abstractions (Table API).
> - We're pushing caching down to the shuffle service (works with all
> implementations), storing the intermediate results. This should also
> naturally work with current fail-over mechanisms for batch (backtrack +
> recompute missing intermediate results [1]).
>
>
> > For setting the parallelism of the CacheTransformation. With the
> > current design, the parallelism of the cache intermediate result is
> > determined by the parallelism of the transformation that produces the
> > intermediate result to cache. Thus, the parallelism of the caching
> > transformation is set by the parallelism of the transformation to be
> > cached. I think the overhead should not be critical as the
> > cache-producing job suffers from the same overhead anyway. For
> > CacheTransformation with large parallelism but the result dataset is
> > relatively small, I think we should reduce the parallelism of the
> > transformation to cache.
>
>
> Is the whole "choosing the right parallelism for caching" problem solved by
> the Adaptive Batch Job Scheduler [2]?
>
> [1]
>
> https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>
> Best,
> D.
>
> On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi Gen,
> >
> > Thanks for your feedback.
> >
> > I think you are talking about how we are going to store the caching
> > data. The first option is to write the data with a sink to an external
> > file system, much like the file store of the Dynamic Table. If I
> > understand correctly, it requires a distributed file system, e.g HDSF,
> > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > file system for caching.
> >
> > As you said, using the shuffle service for caching is quite natural as
> > we need to produce the intermediate result anyway. For Table/SQL API,
> > the table operations are translated to transformations, where we can
> > reuse the CacheTransformation. It should not be unfriendly for
> > Table/SQL API.
> >
> > For setting the parallelism of the CacheTransformation. With the
> > current design, the parallelism of the cache intermediate result is
> > determined by the parallelism of the transformation that produces the
> > intermediate result to cache. Thus, the parallelism of the caching
> > transformation is set by the parallelism of the transformation to be
> > cached. I think the overhead should not be critical as the
> > cache-producing job suffers from the same overhead anyway. For
> > CacheTransformation with large parallelism but the result dataset is
> > relatively small, I think we should reduce the parallelism of the
> > transformation to cache.
> >
> > Best,
> > Xuannan
> >
> >
> >
> > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the reply.
> > >
> > > I do agree that dynamic tables and cached partitions are similar
> features
> > > aiming different cases.  In my opinion, the main difference of the
> > > implementations is to cache only the data or the whole result
> partition.
> > >
> > > To cache only the data, we can translate the CacheTransformation to a
> > Sink
> > > node for writing, and Source node for consuming. Most of the things are
> > > just the same as this FLIP, except for the storage, which is an
> external
> > > one (or a built-in one if we can use the dynamic table storage),
> instead
> > of
> > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> This
> > > can make caching independent from a specific shuffle service, and make
> it
> > > possible to share data between different jobs / Per-Job mode jobs.
> > >
> > > Caching the whole partition is natural in DataStream API, since the
> > > partition is a low-level concept, and data storage is already provided
> by
> > > the default shuffle service. So if we want to choose a solution only to
> > > support cache in DataStream API, caching the whole partition can be a
> > good
> > > choice. But this may be not as friendly to Table/SQL API as to
> > > DataStream, since users are announcing to cache a logical Table (view),
> > > rather than a physical partition. If we want a unified solution for
> both
> > > APIs, this may need to be considered.
> > >
> > >
> > > And here's another suggestion to this FLIP. Maybe we should support
> > > "setParallelism" in CacheTransformation, for both caching and
> consuming.
> > >
> > > In some cases, the input parallelism of the CacheTransformation is
> large
> > > but the result dataset is relatively small. We may need too many
> > resources
> > > to consume the result partition if the source parallelism has to be the
> > > same with the producer.
> > >
> > > On the other hand, serving a large number of partitions may also have
> > more
> > > overhead. Though maybe it's not a big burban, we can try to reduce the
> > > actual cached partition count if necessary, for example by adding a
> > > pass-through vertex with the specific parallelism between the producer
> > and
> > > the cache vertices.
> > >
> > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipengw@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Xuannnan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > Regarding whether and how to support cache sideoutput, I agree that
> the
> > > > second option might be better if there do exist a use case that users
> > need
> > > > to cache only some certain side outputs.
> > > >
> > > >
> > > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > >
> > > > > Hi Zhipeng and Gen,
> > > > >
> > > > > Thanks for joining the discussion.
> > > > >
> > > > > For Zhipeng:
> > > > >
> > > > > - Can we support side output
> > > > > Caching the side output is indeed a valid use case. However, with
> the
> > > > > current API, it is not straightforward to cache the side output.
> You
> > > > > can apply an identity map function to the DataStream returned by
> the
> > > > > getSideOutput method and then cache the result of the map
> > > > > transformation. In my opinion, it is not user-friendly. Therefore,
> we
> > > > > should think of a way to better support the use case.
> > > > > As you say, we can introduce a new class
> > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> `getSideOutput`
> > > > > method to return a `CachedDatastream`. With this approach, the
> cache
> > > > > method implies that both output and the side output of the
> > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > approach is that the user has no control over which side output
> > should
> > > > > be cached.
> > > > > Another option would be to let the `getSideOuput` method return the
> > > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > > output to cache. As the `getSideOutput` method returns a
> > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > transformation that produce the side output, e.g. parallelism,
> buffer
> > > > > timeout, etc. If users try to set different values of the same
> > > > > property of a transformation, an exception will be thrown. What do
> > you
> > > > > think?
> > > > >
> > > > > - Can we support Stream Mode
> > > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > > while in batch mode, it does.  This is the main reason that
> prevents
> > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > unless the job can finish.
> > > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > > mean that you have a job with all bounded sources, but you want the
> > > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > > mode. If that is the case, the job can run in batch mode with
> > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> [1].
> > > > > And we can support caching in this case.
> > > > >
> > > > > - Change parallelism of CachedDataStream
> > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> Thus,
> > > > > it should not be a problem with CachedDataStream.
> > > > >
> > > > > For Gen:
> > > > >
> > > > > - Relation between FLIP-205 and FLIP-188
> > > > > Although it feels like dynamic table and caching are similar in the
> > > > > sense that they let user reuse come intermediate result, they
> target
> > > > > different use cases. The dynamic table is targeting the use case
> > where
> > > > > users want to share a dynamic updating intermediate result across
> > > > > multiple applications. It is some meaningful data that can be
> > consumed
> > > > > by different Flink applications and Flink jobs. While caching is
> > > > > targeting the use case where users know that all the sources are
> > > > > bounded and static, and caching is only used to avoid re-computing
> > the
> > > > > intermediate result. And the cached intermediate result is only
> > > > > meaningful crossing jobs in the same application.
> > > > >
> > > > > Dynamic table and caching can be used together. For example, in a
> > > > > machine learning scenario, we can have a Stream job that is
> > generating
> > > > > some training samples. And we can create a dynamic table for the
> > > > > training sample. And we run a Flink application every hour to do
> some
> > > > > data analysis on the training sample generated in the last hour.
> The
> > > > > Flink application consists of multiple batch jobs and the batch
> jobs
> > > > > share some intermediate results, so users can use cache to avoid
> > > > > re-computation. The intermediate result is not meaningful outside
> of
> > > > > the application. And the cache will be discarded after the
> > application
> > > > > is finished.
> > > > >
> > > > > [1]
> > > > >
> > > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > >
> > > > >
> > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> dynamic
> > > > table
> > > > > > storage, which provides a unified changelog & table
> representation.
> > > > > Tables
> > > > > > stored there can be used in further ad-hoc queries. To my
> > > > understanding,
> > > > > > it's quite like an implementation of caching in Table API, and
> the
> > > > ad-hoc
> > > > > > queries are somehow like further steps in an interactive program.
> > > > > >
> > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > part of
> > > > > > interactive programming in Table API, which we all agree is the
> > major
> > > > > > scenario. What do you think about the relation between it and
> > FLIP-188?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> suxuannan95@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > Thanks for sharing your thoughts.
> > > > > > >
> > > > > > > You are right that most people tend to use high-level API for
> > > > > > > interactive data exploration. Actually, there is
> > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> > as I
> > > > > > > know, it has been accepted but hasn’t been implemented. At the
> > time
> > > > > > > when it is drafted, DataStream did not support Batch mode but
> > Table
> > > > > > > API does.
> > > > > > >
> > > > > > > Now that the DataStream API does support batch processing, I
> > think we
> > > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > > valuable for DataStream users and most of the work we do in
> this
> > FLIP
> > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > >
> > > > > > > After caching is supported at DataStream, we can continue from
> > where
> > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> > have
> > > > to
> > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dmvk@apache.org
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > > thanks for drafting this FLIP.
> > > > > > > >
> > > > > > > > One immediate thought, from what I've seen for interactive
> data
> > > > > > > exploration
> > > > > > > > with Spark, most people tend to use the higher level APIs,
> that
> > > > > allow for
> > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > Table
> > > > API
> > > > > also
> > > > > > > > be covered by this FLIP?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > D.
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > suxuannan95@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi devs,
> > > > > > > > >
> > > > > > > > > I’d like to start a discussion about adding support to
> cache
> > the
> > > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > > >
> > > > > > > > > As the DataStream API now supports batch execution mode, we
> > see
> > > > > users
> > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > programming is
> > > > > > > > > an important use case of Flink batch processing. And the
> > ability
> > > > to
> > > > > > > > > cache intermediate results of a DataStream is crucial to
> the
> > > > > > > > > interactive programming experience.
> > > > > > > > >
> > > > > > > > > Therefore, we propose to support caching a DataStream in
> > Batch
> > > > > > > > > execution. We believe that users can benefit a lot from the
> > > > change
> > > > > and
> > > > > > > > > encourage them to use DataStream API for their interactive
> > batch
> > > > > > > > > processing work.
> > > > > > > > >
> > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> > this
> > > > > email
> > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Xuannan
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > best,
> > > > Zhipeng
> > > >
> >
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by David Morávek <dm...@apache.org>.
Hi Xuannan,

I think this already looks really good. The whole discussions is pretty
long, so I'll just to summarize my current understanding of the outcome:

- This only aims on the DataStream API for now, but can be used as a
building block for the higher level abstractions (Table API).
- We're pushing caching down to the shuffle service (works with all
implementations), storing the intermediate results. This should also
naturally work with current fail-over mechanisms for batch (backtrack +
recompute missing intermediate results [1]).


> For setting the parallelism of the CacheTransformation. With the
> current design, the parallelism of the cache intermediate result is
> determined by the parallelism of the transformation that produces the
> intermediate result to cache. Thus, the parallelism of the caching
> transformation is set by the parallelism of the transformation to be
> cached. I think the overhead should not be critical as the
> cache-producing job suffers from the same overhead anyway. For
> CacheTransformation with large parallelism but the result dataset is
> relatively small, I think we should reduce the parallelism of the
> transformation to cache.


Is the whole "choosing the right parallelism for caching" problem solved by
the Adaptive Batch Job Scheduler [2]?

[1]
https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler

Best,
D.

On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <su...@gmail.com> wrote:

> Hi Gen,
>
> Thanks for your feedback.
>
> I think you are talking about how we are going to store the caching
> data. The first option is to write the data with a sink to an external
> file system, much like the file store of the Dynamic Table. If I
> understand correctly, it requires a distributed file system, e.g HDSF,
> s3, etc. In my opinion, it is too heavyweight to use a distributed
> file system for caching.
>
> As you said, using the shuffle service for caching is quite natural as
> we need to produce the intermediate result anyway. For Table/SQL API,
> the table operations are translated to transformations, where we can
> reuse the CacheTransformation. It should not be unfriendly for
> Table/SQL API.
>
> For setting the parallelism of the CacheTransformation. With the
> current design, the parallelism of the cache intermediate result is
> determined by the parallelism of the transformation that produces the
> intermediate result to cache. Thus, the parallelism of the caching
> transformation is set by the parallelism of the transformation to be
> cached. I think the overhead should not be critical as the
> cache-producing job suffers from the same overhead anyway. For
> CacheTransformation with large parallelism but the result dataset is
> relatively small, I think we should reduce the parallelism of the
> transformation to cache.
>
> Best,
> Xuannan
>
>
>
> On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the reply.
> >
> > I do agree that dynamic tables and cached partitions are similar features
> > aiming different cases.  In my opinion, the main difference of the
> > implementations is to cache only the data or the whole result partition.
> >
> > To cache only the data, we can translate the CacheTransformation to a
> Sink
> > node for writing, and Source node for consuming. Most of the things are
> > just the same as this FLIP, except for the storage, which is an external
> > one (or a built-in one if we can use the dynamic table storage), instead
> of
> > the BLOCKING_PERSISTED type ResultPartition in the shuffle service. This
> > can make caching independent from a specific shuffle service, and make it
> > possible to share data between different jobs / Per-Job mode jobs.
> >
> > Caching the whole partition is natural in DataStream API, since the
> > partition is a low-level concept, and data storage is already provided by
> > the default shuffle service. So if we want to choose a solution only to
> > support cache in DataStream API, caching the whole partition can be a
> good
> > choice. But this may be not as friendly to Table/SQL API as to
> > DataStream, since users are announcing to cache a logical Table (view),
> > rather than a physical partition. If we want a unified solution for both
> > APIs, this may need to be considered.
> >
> >
> > And here's another suggestion to this FLIP. Maybe we should support
> > "setParallelism" in CacheTransformation, for both caching and consuming.
> >
> > In some cases, the input parallelism of the CacheTransformation is large
> > but the result dataset is relatively small. We may need too many
> resources
> > to consume the result partition if the source parallelism has to be the
> > same with the producer.
> >
> > On the other hand, serving a large number of partitions may also have
> more
> > overhead. Though maybe it's not a big burban, we can try to reduce the
> > actual cached partition count if necessary, for example by adding a
> > pass-through vertex with the specific parallelism between the producer
> and
> > the cache vertices.
> >
> > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zh...@gmail.com>
> > wrote:
> >
> > > Hi Xuannnan,
> > >
> > > Thanks for the reply.
> > >
> > > Regarding whether and how to support cache sideoutput, I agree that the
> > > second option might be better if there do exist a use case that users
> need
> > > to cache only some certain side outputs.
> > >
> > >
> > > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> > >
> > > > Hi Zhipeng and Gen,
> > > >
> > > > Thanks for joining the discussion.
> > > >
> > > > For Zhipeng:
> > > >
> > > > - Can we support side output
> > > > Caching the side output is indeed a valid use case. However, with the
> > > > current API, it is not straightforward to cache the side output. You
> > > > can apply an identity map function to the DataStream returned by the
> > > > getSideOutput method and then cache the result of the map
> > > > transformation. In my opinion, it is not user-friendly. Therefore, we
> > > > should think of a way to better support the use case.
> > > > As you say, we can introduce a new class
> > > > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> > > > method to return a `CachedDatastream`. With this approach, the cache
> > > > method implies that both output and the side output of the
> > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > approach is that the user has no control over which side output
> should
> > > > be cached.
> > > > Another option would be to let the `getSideOuput` method return the
> > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > output to cache. As the `getSideOutput` method returns a
> > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > transformation that produce the side output, e.g. parallelism, buffer
> > > > timeout, etc. If users try to set different values of the same
> > > > property of a transformation, an exception will be thrown. What do
> you
> > > > think?
> > > >
> > > > - Can we support Stream Mode
> > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > while in batch mode, it does.  This is the main reason that prevents
> > > > us from supporting cache in stream mode. The cache cannot be used
> > > > unless the job can finish.
> > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > mean that you have a job with all bounded sources, but you want the
> > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > mode. If that is the case, the job can run in batch mode with
> > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> > > > And we can support caching in this case.
> > > >
> > > > - Change parallelism of CachedDataStream
> > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> > > > it should not be a problem with CachedDataStream.
> > > >
> > > > For Gen:
> > > >
> > > > - Relation between FLIP-205 and FLIP-188
> > > > Although it feels like dynamic table and caching are similar in the
> > > > sense that they let user reuse come intermediate result, they target
> > > > different use cases. The dynamic table is targeting the use case
> where
> > > > users want to share a dynamic updating intermediate result across
> > > > multiple applications. It is some meaningful data that can be
> consumed
> > > > by different Flink applications and Flink jobs. While caching is
> > > > targeting the use case where users know that all the sources are
> > > > bounded and static, and caching is only used to avoid re-computing
> the
> > > > intermediate result. And the cached intermediate result is only
> > > > meaningful crossing jobs in the same application.
> > > >
> > > > Dynamic table and caching can be used together. For example, in a
> > > > machine learning scenario, we can have a Stream job that is
> generating
> > > > some training samples. And we can create a dynamic table for the
> > > > training sample. And we run a Flink application every hour to do some
> > > > data analysis on the training sample generated in the last hour. The
> > > > Flink application consists of multiple batch jobs and the batch jobs
> > > > share some intermediate results, so users can use cache to avoid
> > > > re-computation. The intermediate result is not meaningful outside of
> > > > the application. And the cache will be discarded after the
> application
> > > > is finished.
> > > >
> > > > [1]
> > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > >
> > > >
> > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com> wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic
> > > table
> > > > > storage, which provides a unified changelog & table representation.
> > > > Tables
> > > > > stored there can be used in further ad-hoc queries. To my
> > > understanding,
> > > > > it's quite like an implementation of caching in Table API, and the
> > > ad-hoc
> > > > > queries are somehow like further steps in an interactive program.
> > > > >
> > > > > As you replied, caching at Table/SQL API is the next step, as a
> part of
> > > > > interactive programming in Table API, which we all agree is the
> major
> > > > > scenario. What do you think about the relation between it and
> FLIP-188?
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > >
> > > > >
> > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > Thanks for sharing your thoughts.
> > > > > >
> > > > > > You are right that most people tend to use high-level API for
> > > > > > interactive data exploration. Actually, there is
> > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> as I
> > > > > > know, it has been accepted but hasn’t been implemented. At the
> time
> > > > > > when it is drafted, DataStream did not support Batch mode but
> Table
> > > > > > API does.
> > > > > >
> > > > > > Now that the DataStream API does support batch processing, I
> think we
> > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > valuable for DataStream users and most of the work we do in this
> FLIP
> > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > >
> > > > > > After caching is supported at DataStream, we can continue from
> where
> > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> have
> > > to
> > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org>
> > > wrote:
> > > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > thanks for drafting this FLIP.
> > > > > > >
> > > > > > > One immediate thought, from what I've seen for interactive data
> > > > > > exploration
> > > > > > > with Spark, most people tend to use the higher level APIs, that
> > > > allow for
> > > > > > > faster prototyping (Table API in Flink's case). Should the
> Table
> > > API
> > > > also
> > > > > > > be covered by this FLIP?
> > > > > > >
> > > > > > > Best,
> > > > > > > D.
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> suxuannan95@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > I’d like to start a discussion about adding support to cache
> the
> > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > >
> > > > > > > > As the DataStream API now supports batch execution mode, we
> see
> > > > users
> > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > programming is
> > > > > > > > an important use case of Flink batch processing. And the
> ability
> > > to
> > > > > > > > cache intermediate results of a DataStream is crucial to the
> > > > > > > > interactive programming experience.
> > > > > > > >
> > > > > > > > Therefore, we propose to support caching a DataStream in
> Batch
> > > > > > > > execution. We believe that users can benefit a lot from the
> > > change
> > > > and
> > > > > > > > encourage them to use DataStream API for their interactive
> batch
> > > > > > > > processing work.
> > > > > > > >
> > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> this
> > > > email
> > > > > > > > thread. Looking forward to your feedback!
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > >
> > > >
> > >
> > >
> > > --
> > > best,
> > > Zhipeng
> > >
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi Gen,

Thanks for your feedback.

I think you are talking about how we are going to store the caching
data. The first option is to write the data with a sink to an external
file system, much like the file store of the Dynamic Table. If I
understand correctly, it requires a distributed file system, e.g HDSF,
s3, etc. In my opinion, it is too heavyweight to use a distributed
file system for caching.

As you said, using the shuffle service for caching is quite natural as
we need to produce the intermediate result anyway. For Table/SQL API,
the table operations are translated to transformations, where we can
reuse the CacheTransformation. It should not be unfriendly for
Table/SQL API.

For setting the parallelism of the CacheTransformation. With the
current design, the parallelism of the cache intermediate result is
determined by the parallelism of the transformation that produces the
intermediate result to cache. Thus, the parallelism of the caching
transformation is set by the parallelism of the transformation to be
cached. I think the overhead should not be critical as the
cache-producing job suffers from the same overhead anyway. For
CacheTransformation with large parallelism but the result dataset is
relatively small, I think we should reduce the parallelism of the
transformation to cache.

Best,
Xuannan



On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <lu...@gmail.com> wrote:
>
> Hi Xuannan,
>
> Thanks for the reply.
>
> I do agree that dynamic tables and cached partitions are similar features
> aiming different cases.  In my opinion, the main difference of the
> implementations is to cache only the data or the whole result partition.
>
> To cache only the data, we can translate the CacheTransformation to a Sink
> node for writing, and Source node for consuming. Most of the things are
> just the same as this FLIP, except for the storage, which is an external
> one (or a built-in one if we can use the dynamic table storage), instead of
> the BLOCKING_PERSISTED type ResultPartition in the shuffle service. This
> can make caching independent from a specific shuffle service, and make it
> possible to share data between different jobs / Per-Job mode jobs.
>
> Caching the whole partition is natural in DataStream API, since the
> partition is a low-level concept, and data storage is already provided by
> the default shuffle service. So if we want to choose a solution only to
> support cache in DataStream API, caching the whole partition can be a good
> choice. But this may be not as friendly to Table/SQL API as to
> DataStream, since users are announcing to cache a logical Table (view),
> rather than a physical partition. If we want a unified solution for both
> APIs, this may need to be considered.
>
>
> And here's another suggestion to this FLIP. Maybe we should support
> "setParallelism" in CacheTransformation, for both caching and consuming.
>
> In some cases, the input parallelism of the CacheTransformation is large
> but the result dataset is relatively small. We may need too many resources
> to consume the result partition if the source parallelism has to be the
> same with the producer.
>
> On the other hand, serving a large number of partitions may also have more
> overhead. Though maybe it's not a big burban, we can try to reduce the
> actual cached partition count if necessary, for example by adding a
> pass-through vertex with the specific parallelism between the producer and
> the cache vertices.
>
> On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zh...@gmail.com>
> wrote:
>
> > Hi Xuannnan,
> >
> > Thanks for the reply.
> >
> > Regarding whether and how to support cache sideoutput, I agree that the
> > second option might be better if there do exist a use case that users need
> > to cache only some certain side outputs.
> >
> >
> > Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
> >
> > > Hi Zhipeng and Gen,
> > >
> > > Thanks for joining the discussion.
> > >
> > > For Zhipeng:
> > >
> > > - Can we support side output
> > > Caching the side output is indeed a valid use case. However, with the
> > > current API, it is not straightforward to cache the side output. You
> > > can apply an identity map function to the DataStream returned by the
> > > getSideOutput method and then cache the result of the map
> > > transformation. In my opinion, it is not user-friendly. Therefore, we
> > > should think of a way to better support the use case.
> > > As you say, we can introduce a new class
> > > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> > > method to return a `CachedDatastream`. With this approach, the cache
> > > method implies that both output and the side output of the
> > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > approach is that the user has no control over which side output should
> > > be cached.
> > > Another option would be to let the `getSideOuput` method return the
> > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > output to cache. As the `getSideOutput` method returns a
> > > `SingleOutputStreamOperator`. Users can set properties of the
> > > transformation that produce the side output, e.g. parallelism, buffer
> > > timeout, etc. If users try to set different values of the same
> > > property of a transformation, an exception will be thrown. What do you
> > > think?
> > >
> > > - Can we support Stream Mode
> > > Running a job in stream mode doesn't guarantee the job will finish,
> > > while in batch mode, it does.  This is the main reason that prevents
> > > us from supporting cache in stream mode. The cache cannot be used
> > > unless the job can finish.
> > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > mean that you have a job with all bounded sources, but you want the
> > > intermediate data to shuffle in pipelined mode instead of blocking
> > > mode. If that is the case, the job can run in batch mode with
> > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> > > And we can support caching in this case.
> > >
> > > - Change parallelism of CachedDataStream
> > > CachedDataStream extends from DataStream, which doesn't have the
> > > `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> > > it should not be a problem with CachedDataStream.
> > >
> > > For Gen:
> > >
> > > - Relation between FLIP-205 and FLIP-188
> > > Although it feels like dynamic table and caching are similar in the
> > > sense that they let user reuse come intermediate result, they target
> > > different use cases. The dynamic table is targeting the use case where
> > > users want to share a dynamic updating intermediate result across
> > > multiple applications. It is some meaningful data that can be consumed
> > > by different Flink applications and Flink jobs. While caching is
> > > targeting the use case where users know that all the sources are
> > > bounded and static, and caching is only used to avoid re-computing the
> > > intermediate result. And the cached intermediate result is only
> > > meaningful crossing jobs in the same application.
> > >
> > > Dynamic table and caching can be used together. For example, in a
> > > machine learning scenario, we can have a Stream job that is generating
> > > some training samples. And we can create a dynamic table for the
> > > training sample. And we run a Flink application every hour to do some
> > > data analysis on the training sample generated in the last hour. The
> > > Flink application consists of multiple batch jobs and the batch jobs
> > > share some intermediate results, so users can use cache to avoid
> > > re-computation. The intermediate result is not meaningful outside of
> > > the application. And the cache will be discarded after the application
> > > is finished.
> > >
> > > [1]
> > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > >
> > >
> > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic
> > table
> > > > storage, which provides a unified changelog & table representation.
> > > Tables
> > > > stored there can be used in further ad-hoc queries. To my
> > understanding,
> > > > it's quite like an implementation of caching in Table API, and the
> > ad-hoc
> > > > queries are somehow like further steps in an interactive program.
> > > >
> > > > As you replied, caching at Table/SQL API is the next step, as a part of
> > > > interactive programming in Table API, which we all agree is the major
> > > > scenario. What do you think about the relation between it and FLIP-188?
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > >
> > > >
> > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi David,
> > > > >
> > > > > Thanks for sharing your thoughts.
> > > > >
> > > > > You are right that most people tend to use high-level API for
> > > > > interactive data exploration. Actually, there is
> > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > > > know, it has been accepted but hasn’t been implemented. At the time
> > > > > when it is drafted, DataStream did not support Batch mode but Table
> > > > > API does.
> > > > >
> > > > > Now that the DataStream API does support batch processing, I think we
> > > > > can focus on supporting cache at DataStream first. It is still
> > > > > valuable for DataStream users and most of the work we do in this FLIP
> > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > >
> > > > > After caching is supported at DataStream, we can continue from where
> > > > > FLIP-36 left off to support caching at Table/SQL API. We might have
> > to
> > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > > > [1]
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org>
> > wrote:
> > > > > >
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > thanks for drafting this FLIP.
> > > > > >
> > > > > > One immediate thought, from what I've seen for interactive data
> > > > > exploration
> > > > > > with Spark, most people tend to use the higher level APIs, that
> > > allow for
> > > > > > faster prototyping (Table API in Flink's case). Should the Table
> > API
> > > also
> > > > > > be covered by this FLIP?
> > > > > >
> > > > > > Best,
> > > > > > D.
> > > > > >
> > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuannan95@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > I’d like to start a discussion about adding support to cache the
> > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > >
> > > > > > > As the DataStream API now supports batch execution mode, we see
> > > users
> > > > > > > using the DataStream API to run batch jobs. Interactive
> > > programming is
> > > > > > > an important use case of Flink batch processing. And the ability
> > to
> > > > > > > cache intermediate results of a DataStream is crucial to the
> > > > > > > interactive programming experience.
> > > > > > >
> > > > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > > > execution. We believe that users can benefit a lot from the
> > change
> > > and
> > > > > > > encourage them to use DataStream API for their interactive batch
> > > > > > > processing work.
> > > > > > >
> > > > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> > > email
> > > > > > > thread. Looking forward to your feedback!
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > >
> > >
> >
> >
> > --
> > best,
> > Zhipeng
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Gen Luo <lu...@gmail.com>.
Hi Xuannan,

Thanks for the reply.

I do agree that dynamic tables and cached partitions are similar features
aiming different cases.  In my opinion, the main difference of the
implementations is to cache only the data or the whole result partition.

To cache only the data, we can translate the CacheTransformation to a Sink
node for writing, and Source node for consuming. Most of the things are
just the same as this FLIP, except for the storage, which is an external
one (or a built-in one if we can use the dynamic table storage), instead of
the BLOCKING_PERSISTED type ResultPartition in the shuffle service. This
can make caching independent from a specific shuffle service, and make it
possible to share data between different jobs / Per-Job mode jobs.

Caching the whole partition is natural in DataStream API, since the
partition is a low-level concept, and data storage is already provided by
the default shuffle service. So if we want to choose a solution only to
support cache in DataStream API, caching the whole partition can be a good
choice. But this may be not as friendly to Table/SQL API as to
DataStream, since users are announcing to cache a logical Table (view),
rather than a physical partition. If we want a unified solution for both
APIs, this may need to be considered.


And here's another suggestion to this FLIP. Maybe we should support
"setParallelism" in CacheTransformation, for both caching and consuming.

In some cases, the input parallelism of the CacheTransformation is large
but the result dataset is relatively small. We may need too many resources
to consume the result partition if the source parallelism has to be the
same with the producer.

On the other hand, serving a large number of partitions may also have more
overhead. Though maybe it's not a big burban, we can try to reduce the
actual cached partition count if necessary, for example by adding a
pass-through vertex with the specific parallelism between the producer and
the cache vertices.

On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zh...@gmail.com>
wrote:

> Hi Xuannnan,
>
> Thanks for the reply.
>
> Regarding whether and how to support cache sideoutput, I agree that the
> second option might be better if there do exist a use case that users need
> to cache only some certain side outputs.
>
>
> Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:
>
> > Hi Zhipeng and Gen,
> >
> > Thanks for joining the discussion.
> >
> > For Zhipeng:
> >
> > - Can we support side output
> > Caching the side output is indeed a valid use case. However, with the
> > current API, it is not straightforward to cache the side output. You
> > can apply an identity map function to the DataStream returned by the
> > getSideOutput method and then cache the result of the map
> > transformation. In my opinion, it is not user-friendly. Therefore, we
> > should think of a way to better support the use case.
> > As you say, we can introduce a new class
> > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> > method to return a `CachedDatastream`. With this approach, the cache
> > method implies that both output and the side output of the
> > `SingleOutputStreamOperatior` are cached. The problem with this
> > approach is that the user has no control over which side output should
> > be cached.
> > Another option would be to let the `getSideOuput` method return the
> > `SingleOutputStreamOperator`. This way, users can decide which side
> > output to cache. As the `getSideOutput` method returns a
> > `SingleOutputStreamOperator`. Users can set properties of the
> > transformation that produce the side output, e.g. parallelism, buffer
> > timeout, etc. If users try to set different values of the same
> > property of a transformation, an exception will be thrown. What do you
> > think?
> >
> > - Can we support Stream Mode
> > Running a job in stream mode doesn't guarantee the job will finish,
> > while in batch mode, it does.  This is the main reason that prevents
> > us from supporting cache in stream mode. The cache cannot be used
> > unless the job can finish.
> > If I understand correctly, by "run batch jobs in Stream Mode", you
> > mean that you have a job with all bounded sources, but you want the
> > intermediate data to shuffle in pipelined mode instead of blocking
> > mode. If that is the case, the job can run in batch mode with
> > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> > And we can support caching in this case.
> >
> > - Change parallelism of CachedDataStream
> > CachedDataStream extends from DataStream, which doesn't have the
> > `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> > it should not be a problem with CachedDataStream.
> >
> > For Gen:
> >
> > - Relation between FLIP-205 and FLIP-188
> > Although it feels like dynamic table and caching are similar in the
> > sense that they let user reuse come intermediate result, they target
> > different use cases. The dynamic table is targeting the use case where
> > users want to share a dynamic updating intermediate result across
> > multiple applications. It is some meaningful data that can be consumed
> > by different Flink applications and Flink jobs. While caching is
> > targeting the use case where users know that all the sources are
> > bounded and static, and caching is only used to avoid re-computing the
> > intermediate result. And the cached intermediate result is only
> > meaningful crossing jobs in the same application.
> >
> > Dynamic table and caching can be used together. For example, in a
> > machine learning scenario, we can have a Stream job that is generating
> > some training samples. And we can create a dynamic table for the
> > training sample. And we run a Flink application every hour to do some
> > data analysis on the training sample generated in the last hour. The
> > Flink application consists of multiple batch jobs and the batch jobs
> > share some intermediate results, so users can use cache to avoid
> > re-computation. The intermediate result is not meaningful outside of
> > the application. And the cache will be discarded after the application
> > is finished.
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> >
> >
> > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic
> table
> > > storage, which provides a unified changelog & table representation.
> > Tables
> > > stored there can be used in further ad-hoc queries. To my
> understanding,
> > > it's quite like an implementation of caching in Table API, and the
> ad-hoc
> > > queries are somehow like further steps in an interactive program.
> > >
> > > As you replied, caching at Table/SQL API is the next step, as a part of
> > > interactive programming in Table API, which we all agree is the major
> > > scenario. What do you think about the relation between it and FLIP-188?
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > >
> > >
> > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com>
> > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for sharing your thoughts.
> > > >
> > > > You are right that most people tend to use high-level API for
> > > > interactive data exploration. Actually, there is
> > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > > know, it has been accepted but hasn’t been implemented. At the time
> > > > when it is drafted, DataStream did not support Batch mode but Table
> > > > API does.
> > > >
> > > > Now that the DataStream API does support batch processing, I think we
> > > > can focus on supporting cache at DataStream first. It is still
> > > > valuable for DataStream users and most of the work we do in this FLIP
> > > > can be reused. So I want to limit the scope of this FLIP.
> > > >
> > > > After caching is supported at DataStream, we can continue from where
> > > > FLIP-36 left off to support caching at Table/SQL API. We might have
> to
> > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > >
> > > >
> > > >
> > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org>
> wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > thanks for drafting this FLIP.
> > > > >
> > > > > One immediate thought, from what I've seen for interactive data
> > > > exploration
> > > > > with Spark, most people tend to use the higher level APIs, that
> > allow for
> > > > > faster prototyping (Table API in Flink's case). Should the Table
> API
> > also
> > > > > be covered by this FLIP?
> > > > >
> > > > > Best,
> > > > > D.
> > > > >
> > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuannan95@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I’d like to start a discussion about adding support to cache the
> > > > > > intermediate result at DataStream API for batch processing.
> > > > > >
> > > > > > As the DataStream API now supports batch execution mode, we see
> > users
> > > > > > using the DataStream API to run batch jobs. Interactive
> > programming is
> > > > > > an important use case of Flink batch processing. And the ability
> to
> > > > > > cache intermediate results of a DataStream is crucial to the
> > > > > > interactive programming experience.
> > > > > >
> > > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > > execution. We believe that users can benefit a lot from the
> change
> > and
> > > > > > encourage them to use DataStream API for their interactive batch
> > > > > > processing work.
> > > > > >
> > > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> > email
> > > > > > thread. Looking forward to your feedback!
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > >
> >
>
>
> --
> best,
> Zhipeng
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Zhipeng Zhang <zh...@gmail.com>.
Hi Xuannnan,

Thanks for the reply.

Regarding whether and how to support cache sideoutput, I agree that the
second option might be better if there do exist a use case that users need
to cache only some certain side outputs.


Xuannan Su <su...@gmail.com> 于2022年1月4日周二 15:50写道:

> Hi Zhipeng and Gen,
>
> Thanks for joining the discussion.
>
> For Zhipeng:
>
> - Can we support side output
> Caching the side output is indeed a valid use case. However, with the
> current API, it is not straightforward to cache the side output. You
> can apply an identity map function to the DataStream returned by the
> getSideOutput method and then cache the result of the map
> transformation. In my opinion, it is not user-friendly. Therefore, we
> should think of a way to better support the use case.
> As you say, we can introduce a new class
> `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> method to return a `CachedDatastream`. With this approach, the cache
> method implies that both output and the side output of the
> `SingleOutputStreamOperatior` are cached. The problem with this
> approach is that the user has no control over which side output should
> be cached.
> Another option would be to let the `getSideOuput` method return the
> `SingleOutputStreamOperator`. This way, users can decide which side
> output to cache. As the `getSideOutput` method returns a
> `SingleOutputStreamOperator`. Users can set properties of the
> transformation that produce the side output, e.g. parallelism, buffer
> timeout, etc. If users try to set different values of the same
> property of a transformation, an exception will be thrown. What do you
> think?
>
> - Can we support Stream Mode
> Running a job in stream mode doesn't guarantee the job will finish,
> while in batch mode, it does.  This is the main reason that prevents
> us from supporting cache in stream mode. The cache cannot be used
> unless the job can finish.
> If I understand correctly, by "run batch jobs in Stream Mode", you
> mean that you have a job with all bounded sources, but you want the
> intermediate data to shuffle in pipelined mode instead of blocking
> mode. If that is the case, the job can run in batch mode with
> "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> And we can support caching in this case.
>
> - Change parallelism of CachedDataStream
> CachedDataStream extends from DataStream, which doesn't have the
> `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> it should not be a problem with CachedDataStream.
>
> For Gen:
>
> - Relation between FLIP-205 and FLIP-188
> Although it feels like dynamic table and caching are similar in the
> sense that they let user reuse come intermediate result, they target
> different use cases. The dynamic table is targeting the use case where
> users want to share a dynamic updating intermediate result across
> multiple applications. It is some meaningful data that can be consumed
> by different Flink applications and Flink jobs. While caching is
> targeting the use case where users know that all the sources are
> bounded and static, and caching is only used to avoid re-computing the
> intermediate result. And the cached intermediate result is only
> meaningful crossing jobs in the same application.
>
> Dynamic table and caching can be used together. For example, in a
> machine learning scenario, we can have a Stream job that is generating
> some training samples. And we can create a dynamic table for the
> training sample. And we run a Flink application every hour to do some
> data analysis on the training sample generated in the last hour. The
> Flink application consists of multiple batch jobs and the batch jobs
> share some intermediate results, so users can use cache to avoid
> re-computation. The intermediate result is not meaningful outside of
> the application. And the cache will be discarded after the application
> is finished.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
>
>
> On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com> wrote:
> >
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation.
> Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com>
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your thoughts.
> > >
> > > You are right that most people tend to use high-level API for
> > > interactive data exploration. Actually, there is
> > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > know, it has been accepted but hasn’t been implemented. At the time
> > > when it is drafted, DataStream did not support Batch mode but Table
> > > API does.
> > >
> > > Now that the DataStream API does support batch processing, I think we
> > > can focus on supporting cache at DataStream first. It is still
> > > valuable for DataStream users and most of the work we do in this FLIP
> > > can be reused. So I want to limit the scope of this FLIP.
> > >
> > > After caching is supported at DataStream, we can continue from where
> > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > >
> > >
> > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > thanks for drafting this FLIP.
> > > >
> > > > One immediate thought, from what I've seen for interactive data
> > > exploration
> > > > with Spark, most people tend to use the higher level APIs, that
> allow for
> > > > faster prototyping (Table API in Flink's case). Should the Table API
> also
> > > > be covered by this FLIP?
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I’d like to start a discussion about adding support to cache the
> > > > > intermediate result at DataStream API for batch processing.
> > > > >
> > > > > As the DataStream API now supports batch execution mode, we see
> users
> > > > > using the DataStream API to run batch jobs. Interactive
> programming is
> > > > > an important use case of Flink batch processing. And the ability to
> > > > > cache intermediate results of a DataStream is crucial to the
> > > > > interactive programming experience.
> > > > >
> > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > execution. We believe that users can benefit a lot from the change
> and
> > > > > encourage them to use DataStream API for their interactive batch
> > > > > processing work.
> > > > >
> > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> email
> > > > > thread. Looking forward to your feedback!
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > >
>


-- 
best,
Zhipeng

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi Zhipeng and Gen,

Thanks for joining the discussion.

For Zhipeng:

- Can we support side output
Caching the side output is indeed a valid use case. However, with the
current API, it is not straightforward to cache the side output. You
can apply an identity map function to the DataStream returned by the
getSideOutput method and then cache the result of the map
transformation. In my opinion, it is not user-friendly. Therefore, we
should think of a way to better support the use case.
As you say, we can introduce a new class
`CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
method to return a `CachedDatastream`. With this approach, the cache
method implies that both output and the side output of the
`SingleOutputStreamOperatior` are cached. The problem with this
approach is that the user has no control over which side output should
be cached.
Another option would be to let the `getSideOuput` method return the
`SingleOutputStreamOperator`. This way, users can decide which side
output to cache. As the `getSideOutput` method returns a
`SingleOutputStreamOperator`. Users can set properties of the
transformation that produce the side output, e.g. parallelism, buffer
timeout, etc. If users try to set different values of the same
property of a transformation, an exception will be thrown. What do you
think?

- Can we support Stream Mode
Running a job in stream mode doesn't guarantee the job will finish,
while in batch mode, it does.  This is the main reason that prevents
us from supporting cache in stream mode. The cache cannot be used
unless the job can finish.
If I understand correctly, by "run batch jobs in Stream Mode", you
mean that you have a job with all bounded sources, but you want the
intermediate data to shuffle in pipelined mode instead of blocking
mode. If that is the case, the job can run in batch mode with
"execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
And we can support caching in this case.

- Change parallelism of CachedDataStream
CachedDataStream extends from DataStream, which doesn't have the
`setParallelism` method like the `SingleOutputStreamOperator`. Thus,
it should not be a problem with CachedDataStream.

For Gen:

- Relation between FLIP-205 and FLIP-188
Although it feels like dynamic table and caching are similar in the
sense that they let user reuse come intermediate result, they target
different use cases. The dynamic table is targeting the use case where
users want to share a dynamic updating intermediate result across
multiple applications. It is some meaningful data that can be consumed
by different Flink applications and Flink jobs. While caching is
targeting the use case where users know that all the sources are
bounded and static, and caching is only used to avoid re-computing the
intermediate result. And the cached intermediate result is only
meaningful crossing jobs in the same application.

Dynamic table and caching can be used together. For example, in a
machine learning scenario, we can have a Stream job that is generating
some training samples. And we can create a dynamic table for the
training sample. And we run a Flink application every hour to do some
data analysis on the training sample generated in the last hour. The
Flink application consists of multiple batch jobs and the batch jobs
share some intermediate results, so users can use cache to avoid
re-computation. The intermediate result is not meaningful outside of
the application. And the cache will be discarded after the application
is finished.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode


On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <lu...@gmail.com> wrote:
>
> Hi Xuannan,
>
> I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> storage, which provides a unified changelog & table representation. Tables
> stored there can be used in further ad-hoc queries. To my understanding,
> it's quite like an implementation of caching in Table API, and the ad-hoc
> queries are somehow like further steps in an interactive program.
>
> As you replied, caching at Table/SQL API is the next step, as a part of
> interactive programming in Table API, which we all agree is the major
> scenario. What do you think about the relation between it and FLIP-188?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>
>
> On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for sharing your thoughts.
> >
> > You are right that most people tend to use high-level API for
> > interactive data exploration. Actually, there is
> > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > know, it has been accepted but hasn’t been implemented. At the time
> > when it is drafted, DataStream did not support Batch mode but Table
> > API does.
> >
> > Now that the DataStream API does support batch processing, I think we
> > can focus on supporting cache at DataStream first. It is still
> > valuable for DataStream users and most of the work we do in this FLIP
> > can be reused. So I want to limit the scope of this FLIP.
> >
> > After caching is supported at DataStream, we can continue from where
> > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > re-vote FLIP-36 or draft a new FLIP. What do you think?
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> >
> >
> > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > thanks for drafting this FLIP.
> > >
> > > One immediate thought, from what I've seen for interactive data
> > exploration
> > > with Spark, most people tend to use the higher level APIs, that allow for
> > > faster prototyping (Table API in Flink's case). Should the Table API also
> > > be covered by this FLIP?
> > >
> > > Best,
> > > D.
> > >
> > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I’d like to start a discussion about adding support to cache the
> > > > intermediate result at DataStream API for batch processing.
> > > >
> > > > As the DataStream API now supports batch execution mode, we see users
> > > > using the DataStream API to run batch jobs. Interactive programming is
> > > > an important use case of Flink batch processing. And the ability to
> > > > cache intermediate results of a DataStream is crucial to the
> > > > interactive programming experience.
> > > >
> > > > Therefore, we propose to support caching a DataStream in Batch
> > > > execution. We believe that users can benefit a lot from the change and
> > > > encourage them to use DataStream API for their interactive batch
> > > > processing work.
> > > >
> > > > Please check out the FLIP-205 [1] and feel free to reply to this email
> > > > thread. Looking forward to your feedback!
> > > >
> > > > [1]
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi Yun,

Thanks for your feedback!

1. With the cached stream the compile and job submission happens as a
regular job submission. And a job with multiple concurrent cached
DataStream is supported. For your example, a and b are run in the same
job. Thus, all the cached DataStream are created when the job is
completed.

2. If I understand your question correctly, this wouldn’t be a problem
if we support concurrent cached DataStream in a job.

3. Yes, the execution would have the same compile result regardless of
what deployment mode it is. If the user tries to run multiple batch
job that uses cache in one StreamExecutionEnvironment with Per-job
deployment mode.  The cache-consuming job will fail and we go through
the failover procedure to re-submit the job with the original as if
the cache hasn’t been created.
We can do better if we know what deployment mode upfront and disable
the caching for Per-job mode. Maybe we can check the
`execution.target` option to see if it is Per-Job mode. What do you
think?

4. This is a good question. And I can imagine a use case where users
want to process some bounded sources and cache the intermediate
result, verify the result, and then use it later for a Stream job.
Batch mode is required when creating the cache so that we know the job
will finish and the cache can be reused. When consuming the cache, it
could be in either Batch mode or Stream mode. For stream mode, it
behaves differently when the cache Datastream hasn't been created or
is invalid. It should compute the intermediate result from scratch but
it should not cache the intermediate result.

For remote shuffle service, I think it is fine if the current
design is aligned with remote shuffle service. For any work that is
required for remote shuffle service to work with caching, I am more
than happy to help.

Best,
Xuannan



On Wed, Jan 5, 2022 at 4:49 PM Yun Gao <yu...@aliyun.com.invalid> wrote:
>
> Hi Xuannan,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> I have several issues, sorry if I have misunderstandings:
>
> 1. With the cached stream, when would the compile and job submission
> happens? Does it happen on calling execute_and_cache() ? If so, could
> we support the job with multiple concurrent cached stream like
>
> DataStream a = ...
> DataStream b = ...
> a.cache()
> b.cache()
> // could we run a/b in a same job ?
>
> If not, perhaps we might have part of graphs that would not be executed?
>
> 2. If the part of graphs using the cache partition is executed as a second
> job, would the job be executed after its precedent jobs get finished?
> Would the StreamExecutionEnviornment does this tracking?
>
> 3. Do the execution would have the same compile result when running on per-job v.s. application / session mode ? Since for per-job mode, when executing the part of the graph that using the cached result, we might need to run the whole graph from the sources; but for application / session mode, it would be
> compiled to a separate job reading from the cached result partitions. If the
> compile result is different, perhaps currently we could not get the execution mode when compiling ?
>
> 4. For the part of graph using the cached result, do we support the stream case? Like we have a part of graph that have two sources, one source is a
> cached result partition and the other one is an unbounded job.
>
> For remote shuffle service, It seems to me currently we do not have
> complete process for them to support the cache ResultPartition, since
> in JobMasterPartitionTrackerImpl we have not support prompt a result
> partition via pluggable ShuffleMaster yet. But we should be able to further
> complete this part.
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Xuannan Su <su...@gmail.com>
> Send Time:2022 Jan. 5 (Wed.) 14:04
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing
>
> Hi David,
>
> We have a description in the FLIP about the case of TM failure without
> the remote shuffle service. Basically, since the partitions are stored
> at the TM, a TM failure requires recomputing the intermediate result.
>
> If a Flink job uses the remote shuffle service, the partitions are
> stored at the remote shuffle service. In this case, the failure of TM
> will not cause any partition loss. Therefore, recomputing the
> intermediate result is not required. In case of partition lost at the
> remote shuffle service, even without a TM failure, the cached
> intermediate result is not valid anymore, so the intermediate result
> has to be recomputed.
>
> To summarize, no matter where the partitions are stored, locally at TM
> or remotely at remote shuffle service, recomputing is only required if
> the consuming job finds some partitions lost.
>
> I updated the FLIP to include the description of failover when using
> remote shuffle service.
>
> Best,
> Xuannan
>
>
> On Mon, Jan 3, 2022 at 4:17 PM David Morávek <dm...@apache.org> wrote:
> >
> > One more question from my side, should we make sure this plays well with
> > the remote shuffle service [1] in case of TM failure?
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
> > D.
> >
> > On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <lu...@gmail.com> wrote:
> >
> > > Hi Xuannan,
> > >
> > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > > storage, which provides a unified changelog & table representation. Tables
> > > stored there can be used in further ad-hoc queries. To my understanding,
> > > it's quite like an implementation of caching in Table API, and the ad-hoc
> > > queries are somehow like further steps in an interactive program.
> > >
> > > As you replied, caching at Table/SQL API is the next step, as a part of
> > > interactive programming in Table API, which we all agree is the major
> > > scenario. What do you think about the relation between it and FLIP-188?
> > >
> > > [1]
> > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > >
> > >
> > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for sharing your thoughts.
> > > >
> > > > You are right that most people tend to use high-level API for
> > > > interactive data exploration. Actually, there is
> > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > > know, it has been accepted but hasn’t been implemented. At the time
> > > > when it is drafted, DataStream did not support Batch mode but Table
> > > > API does.
> > > >
> > > > Now that the DataStream API does support batch processing, I think we
> > > > can focus on supporting cache at DataStream first. It is still
> > > > valuable for DataStream users and most of the work we do in this FLIP
> > > > can be reused. So I want to limit the scope of this FLIP.
> > > >
> > > > After caching is supported at DataStream, we can continue from where
> > > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > >
> > > >
> > > >
> > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > thanks for drafting this FLIP.
> > > > >
> > > > > One immediate thought, from what I've seen for interactive data
> > > > exploration
> > > > > with Spark, most people tend to use the higher level APIs, that allow
> > > for
> > > > > faster prototyping (Table API in Flink's case). Should the Table API
> > > also
> > > > > be covered by this FLIP?
> > > > >
> > > > > Best,
> > > > > D.
> > > > >
> > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I’d like to start a discussion about adding support to cache the
> > > > > > intermediate result at DataStream API for batch processing.
> > > > > >
> > > > > > As the DataStream API now supports batch execution mode, we see users
> > > > > > using the DataStream API to run batch jobs. Interactive programming
> > > is
> > > > > > an important use case of Flink batch processing. And the ability to
> > > > > > cache intermediate results of a DataStream is crucial to the
> > > > > > interactive programming experience.
> > > > > >
> > > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > > execution. We believe that users can benefit a lot from the change
> > > and
> > > > > > encourage them to use DataStream API for their interactive batch
> > > > > > processing work.
> > > > > >
> > > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> > > email
> > > > > > thread. Looking forward to your feedback!
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > >
> > >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Xuannan,

Very thanks for drafting the FLIP and initiating the discussion!

I have several issues, sorry if I have misunderstandings:

1. With the cached stream, when would the compile and job submission
happens? Does it happen on calling execute_and_cache() ? If so, could
we support the job with multiple concurrent cached stream like

DataStream a = ...
DataStream b = ...
a.cache()
b.cache()
// could we run a/b in a same job ? 

If not, perhaps we might have part of graphs that would not be executed? 

2. If the part of graphs using the cache partition is executed as a second
job, would the job be executed after its precedent jobs get finished?
Would the StreamExecutionEnviornment does this tracking? 

3. Do the execution would have the same compile result when running on per-job v.s. application / session mode ? Since for per-job mode, when executing the part of the graph that using the cached result, we might need to run the whole graph from the sources; but for application / session mode, it would be
compiled to a separate job reading from the cached result partitions. If the 
compile result is different, perhaps currently we could not get the execution mode when compiling ? 

4. For the part of graph using the cached result, do we support the stream case? Like we have a part of graph that have two sources, one source is a 
cached result partition and the other one is an unbounded job.

For remote shuffle service, It seems to me currently we do not have
complete process for them to support the cache ResultPartition, since
in JobMasterPartitionTrackerImpl we have not support prompt a result
partition via pluggable ShuffleMaster yet. But we should be able to further
complete this part.

Best,
Yun


------------------------------------------------------------------
From:Xuannan Su <su...@gmail.com>
Send Time:2022 Jan. 5 (Wed.) 14:04
To:dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Hi David,

We have a description in the FLIP about the case of TM failure without
the remote shuffle service. Basically, since the partitions are stored
at the TM, a TM failure requires recomputing the intermediate result.

If a Flink job uses the remote shuffle service, the partitions are
stored at the remote shuffle service. In this case, the failure of TM
will not cause any partition loss. Therefore, recomputing the
intermediate result is not required. In case of partition lost at the
remote shuffle service, even without a TM failure, the cached
intermediate result is not valid anymore, so the intermediate result
has to be recomputed.

To summarize, no matter where the partitions are stored, locally at TM
or remotely at remote shuffle service, recomputing is only required if
the consuming job finds some partitions lost.

I updated the FLIP to include the description of failover when using
remote shuffle service.

Best,
Xuannan


On Mon, Jan 3, 2022 at 4:17 PM David Morávek <dm...@apache.org> wrote:
>
> One more question from my side, should we make sure this plays well with
> the remote shuffle service [1] in case of TM failure?
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>
> D.
>
> On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <lu...@gmail.com> wrote:
>
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation. Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your thoughts.
> > >
> > > You are right that most people tend to use high-level API for
> > > interactive data exploration. Actually, there is
> > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > know, it has been accepted but hasn’t been implemented. At the time
> > > when it is drafted, DataStream did not support Batch mode but Table
> > > API does.
> > >
> > > Now that the DataStream API does support batch processing, I think we
> > > can focus on supporting cache at DataStream first. It is still
> > > valuable for DataStream users and most of the work we do in this FLIP
> > > can be reused. So I want to limit the scope of this FLIP.
> > >
> > > After caching is supported at DataStream, we can continue from where
> > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > >
> > >
> > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > thanks for drafting this FLIP.
> > > >
> > > > One immediate thought, from what I've seen for interactive data
> > > exploration
> > > > with Spark, most people tend to use the higher level APIs, that allow
> > for
> > > > faster prototyping (Table API in Flink's case). Should the Table API
> > also
> > > > be covered by this FLIP?
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I’d like to start a discussion about adding support to cache the
> > > > > intermediate result at DataStream API for batch processing.
> > > > >
> > > > > As the DataStream API now supports batch execution mode, we see users
> > > > > using the DataStream API to run batch jobs. Interactive programming
> > is
> > > > > an important use case of Flink batch processing. And the ability to
> > > > > cache intermediate results of a DataStream is crucial to the
> > > > > interactive programming experience.
> > > > >
> > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > execution. We believe that users can benefit a lot from the change
> > and
> > > > > encourage them to use DataStream API for their interactive batch
> > > > > processing work.
> > > > >
> > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> > email
> > > > > thread. Looking forward to your feedback!
> > > > >
> > > > > [1]
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > >
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi David,

We have a description in the FLIP about the case of TM failure without
the remote shuffle service. Basically, since the partitions are stored
at the TM, a TM failure requires recomputing the intermediate result.

If a Flink job uses the remote shuffle service, the partitions are
stored at the remote shuffle service. In this case, the failure of TM
will not cause any partition loss. Therefore, recomputing the
intermediate result is not required. In case of partition lost at the
remote shuffle service, even without a TM failure, the cached
intermediate result is not valid anymore, so the intermediate result
has to be recomputed.

To summarize, no matter where the partitions are stored, locally at TM
or remotely at remote shuffle service, recomputing is only required if
the consuming job finds some partitions lost.

I updated the FLIP to include the description of failover when using
remote shuffle service.

Best,
Xuannan


On Mon, Jan 3, 2022 at 4:17 PM David Morávek <dm...@apache.org> wrote:
>
> One more question from my side, should we make sure this plays well with
> the remote shuffle service [1] in case of TM failure?
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>
> D.
>
> On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <lu...@gmail.com> wrote:
>
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation. Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your thoughts.
> > >
> > > You are right that most people tend to use high-level API for
> > > interactive data exploration. Actually, there is
> > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > know, it has been accepted but hasn’t been implemented. At the time
> > > when it is drafted, DataStream did not support Batch mode but Table
> > > API does.
> > >
> > > Now that the DataStream API does support batch processing, I think we
> > > can focus on supporting cache at DataStream first. It is still
> > > valuable for DataStream users and most of the work we do in this FLIP
> > > can be reused. So I want to limit the scope of this FLIP.
> > >
> > > After caching is supported at DataStream, we can continue from where
> > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > >
> > >
> > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > thanks for drafting this FLIP.
> > > >
> > > > One immediate thought, from what I've seen for interactive data
> > > exploration
> > > > with Spark, most people tend to use the higher level APIs, that allow
> > for
> > > > faster prototyping (Table API in Flink's case). Should the Table API
> > also
> > > > be covered by this FLIP?
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I’d like to start a discussion about adding support to cache the
> > > > > intermediate result at DataStream API for batch processing.
> > > > >
> > > > > As the DataStream API now supports batch execution mode, we see users
> > > > > using the DataStream API to run batch jobs. Interactive programming
> > is
> > > > > an important use case of Flink batch processing. And the ability to
> > > > > cache intermediate results of a DataStream is crucial to the
> > > > > interactive programming experience.
> > > > >
> > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > execution. We believe that users can benefit a lot from the change
> > and
> > > > > encourage them to use DataStream API for their interactive batch
> > > > > processing work.
> > > > >
> > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> > email
> > > > > thread. Looking forward to your feedback!
> > > > >
> > > > > [1]
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > >
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by David Morávek <dm...@apache.org>.
One more question from my side, should we make sure this plays well with
the remote shuffle service [1] in case of TM failure?

[1] https://github.com/flink-extended/flink-remote-shuffle

D.

On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <lu...@gmail.com> wrote:

> Hi Xuannan,
>
> I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> storage, which provides a unified changelog & table representation. Tables
> stored there can be used in further ad-hoc queries. To my understanding,
> it's quite like an implementation of caching in Table API, and the ad-hoc
> queries are somehow like further steps in an interactive program.
>
> As you replied, caching at Table/SQL API is the next step, as a part of
> interactive programming in Table API, which we all agree is the major
> scenario. What do you think about the relation between it and FLIP-188?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>
>
> On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for sharing your thoughts.
> >
> > You are right that most people tend to use high-level API for
> > interactive data exploration. Actually, there is
> > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > know, it has been accepted but hasn’t been implemented. At the time
> > when it is drafted, DataStream did not support Batch mode but Table
> > API does.
> >
> > Now that the DataStream API does support batch processing, I think we
> > can focus on supporting cache at DataStream first. It is still
> > valuable for DataStream users and most of the work we do in this FLIP
> > can be reused. So I want to limit the scope of this FLIP.
> >
> > After caching is supported at DataStream, we can continue from where
> > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > re-vote FLIP-36 or draft a new FLIP. What do you think?
> >
> > Best,
> > Xuannan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> >
> >
> > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > thanks for drafting this FLIP.
> > >
> > > One immediate thought, from what I've seen for interactive data
> > exploration
> > > with Spark, most people tend to use the higher level APIs, that allow
> for
> > > faster prototyping (Table API in Flink's case). Should the Table API
> also
> > > be covered by this FLIP?
> > >
> > > Best,
> > > D.
> > >
> > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I’d like to start a discussion about adding support to cache the
> > > > intermediate result at DataStream API for batch processing.
> > > >
> > > > As the DataStream API now supports batch execution mode, we see users
> > > > using the DataStream API to run batch jobs. Interactive programming
> is
> > > > an important use case of Flink batch processing. And the ability to
> > > > cache intermediate results of a DataStream is crucial to the
> > > > interactive programming experience.
> > > >
> > > > Therefore, we propose to support caching a DataStream in Batch
> > > > execution. We believe that users can benefit a lot from the change
> and
> > > > encourage them to use DataStream API for their interactive batch
> > > > processing work.
> > > >
> > > > Please check out the FLIP-205 [1] and feel free to reply to this
> email
> > > > thread. Looking forward to your feedback!
> > > >
> > > > [1]
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> >
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Gen Luo <lu...@gmail.com>.
Hi Xuannan,

I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
storage, which provides a unified changelog & table representation. Tables
stored there can be used in further ad-hoc queries. To my understanding,
it's quite like an implementation of caching in Table API, and the ad-hoc
queries are somehow like further steps in an interactive program.

As you replied, caching at Table/SQL API is the next step, as a part of
interactive programming in Table API, which we all agree is the major
scenario. What do you think about the relation between it and FLIP-188?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <su...@gmail.com> wrote:

> Hi David,
>
> Thanks for sharing your thoughts.
>
> You are right that most people tend to use high-level API for
> interactive data exploration. Actually, there is
> the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> know, it has been accepted but hasn’t been implemented. At the time
> when it is drafted, DataStream did not support Batch mode but Table
> API does.
>
> Now that the DataStream API does support batch processing, I think we
> can focus on supporting cache at DataStream first. It is still
> valuable for DataStream users and most of the work we do in this FLIP
> can be reused. So I want to limit the scope of this FLIP.
>
> After caching is supported at DataStream, we can continue from where
> FLIP-36 left off to support caching at Table/SQL API. We might have to
> re-vote FLIP-36 or draft a new FLIP. What do you think?
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
>
>
> On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
> >
> > Hi Xuannan,
> >
> > thanks for drafting this FLIP.
> >
> > One immediate thought, from what I've seen for interactive data
> exploration
> > with Spark, most people tend to use the higher level APIs, that allow for
> > faster prototyping (Table API in Flink's case). Should the Table API also
> > be covered by this FLIP?
> >
> > Best,
> > D.
> >
> > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com>
> wrote:
> >
> > > Hi devs,
> > >
> > > I’d like to start a discussion about adding support to cache the
> > > intermediate result at DataStream API for batch processing.
> > >
> > > As the DataStream API now supports batch execution mode, we see users
> > > using the DataStream API to run batch jobs. Interactive programming is
> > > an important use case of Flink batch processing. And the ability to
> > > cache intermediate results of a DataStream is crucial to the
> > > interactive programming experience.
> > >
> > > Therefore, we propose to support caching a DataStream in Batch
> > > execution. We believe that users can benefit a lot from the change and
> > > encourage them to use DataStream API for their interactive batch
> > > processing work.
> > >
> > > Please check out the FLIP-205 [1] and feel free to reply to this email
> > > thread. Looking forward to your feedback!
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > >
> > > Best,
> > > Xuannan
> > >
>

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Xuannan Su <su...@gmail.com>.
Hi David,

Thanks for sharing your thoughts.

You are right that most people tend to use high-level API for
interactive data exploration. Actually, there is
the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
know, it has been accepted but hasn’t been implemented. At the time
when it is drafted, DataStream did not support Batch mode but Table
API does.

Now that the DataStream API does support batch processing, I think we
can focus on supporting cache at DataStream first. It is still
valuable for DataStream users and most of the work we do in this FLIP
can be reused. So I want to limit the scope of this FLIP.

After caching is supported at DataStream, we can continue from where
FLIP-36 left off to support caching at Table/SQL API. We might have to
re-vote FLIP-36 or draft a new FLIP. What do you think?

Best,
Xuannan

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



On Wed, Dec 29, 2021 at 6:08 PM David Morávek <dm...@apache.org> wrote:
>
> Hi Xuannan,
>
> thanks for drafting this FLIP.
>
> One immediate thought, from what I've seen for interactive data exploration
> with Spark, most people tend to use the higher level APIs, that allow for
> faster prototyping (Table API in Flink's case). Should the Table API also
> be covered by this FLIP?
>
> Best,
> D.
>
> On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion about adding support to cache the
> > intermediate result at DataStream API for batch processing.
> >
> > As the DataStream API now supports batch execution mode, we see users
> > using the DataStream API to run batch jobs. Interactive programming is
> > an important use case of Flink batch processing. And the ability to
> > cache intermediate results of a DataStream is crucial to the
> > interactive programming experience.
> >
> > Therefore, we propose to support caching a DataStream in Batch
> > execution. We believe that users can benefit a lot from the change and
> > encourage them to use DataStream API for their interactive batch
> > processing work.
> >
> > Please check out the FLIP-205 [1] and feel free to reply to this email
> > thread. Looking forward to your feedback!
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> >
> > Best,
> > Xuannan
> >

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by Zhipeng Zhang <zh...@gmail.com>.
Hi Xuannan,

Thanks for starting the discussion. This would certainly help a lot on both
efficiency and reproducibility in machine learning cases :)

I have a few questions as follows:

1. Can we support caching both the output and sideoutputs of a
SingleOutputStreamOperator (which I believe is a reasonable use case),
given that  `cache()` is defined on `SingleOutputStreamOperator`?

If not, shall we introduce another class, say
"CachedSingleOutputStreamOperator", which extends
SingleOutputStreamOperator and overrides the getSideOutput method and
return CachedDataStream?

2. Is there any chance that we also support cache in Stream Mode if the one
SingleOutputStreamOperator is bounded? We may also want to run batch jobs
in a Stream Mode. Could you add some discussions in the FLIP?

3. What are we going to do if users change the parallelism of
CachedDataStream? Shall we throw an exception or add a new operator when
translating the job graph?

Two typos:
1.  ...a stream node with the sample parallelism as its input is added to
the StreamGraph....
---> "the same parallelism"
2. In figure of Job1, one-input transformation
---> MapTransformation

Best,
Zhipeng


David Morávek <dm...@apache.org> 于2021年12月29日周三 18:08写道:

> Hi Xuannan,
>
> thanks for drafting this FLIP.
>
> One immediate thought, from what I've seen for interactive data exploration
> with Spark, most people tend to use the higher level APIs, that allow for
> faster prototyping (Table API in Flink's case). Should the Table API also
> be covered by this FLIP?
>
> Best,
> D.
>
> On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com> wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion about adding support to cache the
> > intermediate result at DataStream API for batch processing.
> >
> > As the DataStream API now supports batch execution mode, we see users
> > using the DataStream API to run batch jobs. Interactive programming is
> > an important use case of Flink batch processing. And the ability to
> > cache intermediate results of a DataStream is crucial to the
> > interactive programming experience.
> >
> > Therefore, we propose to support caching a DataStream in Batch
> > execution. We believe that users can benefit a lot from the change and
> > encourage them to use DataStream API for their interactive batch
> > processing work.
> >
> > Please check out the FLIP-205 [1] and feel free to reply to this email
> > thread. Looking forward to your feedback!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> >
> > Best,
> > Xuannan
> >
>


-- 
best,
Zhipeng

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Posted by David Morávek <dm...@apache.org>.
Hi Xuannan,

thanks for drafting this FLIP.

One immediate thought, from what I've seen for interactive data exploration
with Spark, most people tend to use the higher level APIs, that allow for
faster prototyping (Table API in Flink's case). Should the Table API also
be covered by this FLIP?

Best,
D.

On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <su...@gmail.com> wrote:

> Hi devs,
>
> I’d like to start a discussion about adding support to cache the
> intermediate result at DataStream API for batch processing.
>
> As the DataStream API now supports batch execution mode, we see users
> using the DataStream API to run batch jobs. Interactive programming is
> an important use case of Flink batch processing. And the ability to
> cache intermediate results of a DataStream is crucial to the
> interactive programming experience.
>
> Therefore, we propose to support caching a DataStream in Batch
> execution. We believe that users can benefit a lot from the change and
> encourage them to use DataStream API for their interactive batch
> processing work.
>
> Please check out the FLIP-205 [1] and feel free to reply to this email
> thread. Looking forward to your feedback!
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
>
> Best,
> Xuannan
>