You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Biao Liu <mm...@gmail.com> on 2022/12/22 12:16:21 UTC

[DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Hi everyone,

I would like to start a discussion about making Sink support speculative
execution for batch jobs. This proposal is a follow up of "FLIP-168:
Speculative Execution For Batch Job"[1]. Speculative execution is very
meaningful for batch jobs. And it would be more complete after supporting
speculative execution of Sink. Please find more details in the FLIP document
[2].

Looking forward to your feedback.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job

Thanks,
Biao /'bɪ.aʊ/

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Martijn Visser <ma...@apache.org>.
Hi Jing,

That's a valid point but it brings up a discussion on the promotion of the
Sink V2 API in general. I'll open a separate discussion thread for this.

Best regards,

Martijn



Op wo 18 jan. 2023 om 11:01 schreef Jing Ge <ji...@ververica.com.invalid>:

> Hi,
>
> I think it will be confusing for users that the older API is deprecated but
> the related new API is not graduated yet which ends up with the awkward
> situation that none proper API could be used in production. In our case,
> e.g. the sinkv2.Sink should be marked as @public before the SinkFunction
> can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not
> sure if the Flink community already has a similar rule. If not, a new one
> should be defined.
>
> Best regards,
> Jing
>
> On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > Just to clarify, I understand your point of view and will not block your
> > FLIP :)
> >
> > It would indeed be great to achieve that both the SinkFunction and
> > SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed
> > out.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op wo 18 jan. 2023 om 09:34 schreef Yun Tang <my...@live.com>:
> >
> > > Hi Biao,
> > >
> > > I think it's time to deprecate the SinkFunction and it would be fine if
> > > you could drive to launch the discussion.
> > >
> > > BTW, we might make it done in flink-1.17 release with deprecating
> > > SourceFunction[1] together.
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-28045
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Biao Liu <mm...@gmail.com>
> > > Sent: Wednesday, January 18, 2023 16:15
> > > To: dev@flink.apache.org <de...@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > > Batch Job
> > >
> > > Hi Martijn & Jing,
> > >
> > > Thanks for feedback!
> > >
> > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed
> > out,
> > > SinkFunction is still marked as public. Technically, according to the
> > > Flink Bylaws[1],
> > > the decision should be approved through an official voting. Although
> many
> > > of the community maintainers (including me) thought it should be
> > > deprecated, we still should not assume it has been the fact.
> Considering
> > > the discussion and voting may last 1 or 2 weeks and it may last longer
> > > if someone has an objection. I'd like to keep pushing the FLIP-281
> > forward
> > > with current design. I hope it can catch up with the release of 1.17.
> > >
> > > By the way, if nobody drives the deprecating thing, I would like to
> start
> > > another discussion to talk about it. What do you think?
> > >
> > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Fri, 13 Jan 2023 at 08:43, Jing Ge <ji...@ververica.com.invalid>
> > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for driving this. Like Martijn already pointed out. We will
> > spend
> > > > effort to remove SinkFunction after we deprecate it. The more
> > > > functionality added into it, the bigger effort we will have to
> > deprecate
> > > > and remove the SinkFunction. Commonly, It is not recommended to add
> new
> > > > features into an interface which we already decided to deprecate but
> do
> > > not
> > > > do yet. But, this FLIP is a special case and there are some reasons
> > that
> > > > lead us to support this proposal.
> > > >
> > > > First, the FLIP offered an equivalent solution for the new SinkV2,
> > which
> > > > means the migration from SinkFunction to SinkV2 for this feature is
> > > > predictable and acceptable. The concern I raised above has been
> solved.
> > > >
> > > > Second, since the SinkFunction is still marked as public now [1], it
> > > should
> > > > be fine to add new features into it (follow the rules), especially if
> > the
> > > > requirement is urgent. Similar to [2] described for API graduation,
> it
> > > > should also take 8 months (two release cycles, ideal case is 8
> months,
> > > > could be longer) to go from @Public to @Deprecated and to be removed.
> > > > Additionally, considering the SinkFunction is one core function whose
> > > > deletion will trigger a lot of further downstream deletions. The
> > duration
> > > > will be increased to be 16 months (again, idea case) or even longer,
> > > e.g. 2
> > > > years.
> > > >
> > > > Third, the SinkV2 is still marked as @PublicEvolving, which means a
> few
> > > > more months (8 months?) in addition before we can start the
> deprecation
> > > of
> > > > SinkFunction. It is not rational to say no features should be added
> > into
> > > > SinkFunction during the upcoming 2 or 3 years.
> > > >
> > > > After thinking about all these aspects, I would support this FLIP, so
> > +1
> > > >
> > > > This discussion leads us to another issue: we should graduate SinkV2
> > > > and deprecate and remove SinkFunction asap. The longer we keep
> > > > the SinkFunktion in the code base, the bigger effort we will have
> while
> > > > working on anything that might depend on sink or has impact on sink.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> > > >
> > > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <
> > martijnvisser@apache.org
> > > >
> > > > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > While I rather wouldn't add new features to (to-be) deprecated
> > > features,
> > > > I
> > > > > would be +0 for this.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
> > > > >
> > > > > > Hi Martijn,
> > > > > >
> > > > > > Thanks for your feedback!
> > > > > >
> > > > > > Yes, we propose to support speculative execution for
> SinkFunction.
> > > > > > 1. From the perspective of compatibility, SinkFunction is the
> most
> > > > > original
> > > > > > Sink implementation.There are lots of implementations based on
> > > > > > SinkFunction, not only in Flink official codebase but also in
> > user's
> > > > > > private codebase. It's a more serious issue than Sink V1. Of
> course
> > > we
> > > > > hope
> > > > > > users could migrate the legacy implementation to the new
> interface.
> > > > > However
> > > > > > migration is always hard.
> > > > > > 2. From the perspective of cost, we don't need to do much extra
> > work
> > > to
> > > > > > support speculative execution for SinkFunction. All we need to do
> > is
> > > > > check
> > > > > > whether the SinkFunction implementation
> > > > > > inherits SupportsConcurrentExecutionAttempts or not. The other
> > parts
> > > of
> > > > > > work are the same with Sink V2.
> > > > > >
> > > > > > To summarize, it's cheap to support speculative execution for
> > > > > SinkFunction.
> > > > > > And it may allow more existing scenarios to run with speculative
> > > > > execution.
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <
> > > martijnvisser@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Apologies for the late jumping in. My only question is about
> > > > > > SinkFunction,
> > > > > > > does this imply that we want to add support for this to the
> > > > > SinkFunction?
> > > > > > > If so, I would not be in favour of that since we would like to
> > > > > deprecate
> > > > > > (I
> > > > > > > actually thought that was already the case) the SinkFunction in
> > > > favour
> > > > > of
> > > > > > > SinkV2.
> > > > > > >
> > > > > > > Besides that, I have no other comments.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <
> beyond1920@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for explanation.
> > > > > > > >
> > > > > > > > +1 for the proposal.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jing Zhang
> > > > > > > >
> > > > > > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for the explanation of how SinkV2  knows the right
> > > subtask
> > > > > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Lijie
> > > > > > > > >
> > > > > > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > > > > >
> > > > > > > > > > Thanks for all your feedback!
> > > > > > > > > >
> > > > > > > > > > To @Yuxia,
> > > > > > > > > >
> > > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > > speculative
> > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > generate a
> > > > new
> > > > > > > > > attempt.
> > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes there is something different from the task failover
> > > > scenario.
> > > > > > The
> > > > > > > > > > attempt number is more necessary for speculative
> execution
> > > than
> > > > > > > > failover.
> > > > > > > > > > Because there can be only one subtask instance running at
> > the
> > > > > same
> > > > > > > time
> > > > > > > > > in
> > > > > > > > > > the failover scenario.
> > > > > > > > > >
> > > > > > > > > > Let's take FileSystemOutputFormat as an example. For the
> > > > failover
> > > > > > > > > scenario,
> > > > > > > > > > the temporary directory to store produced data can be
> > > something
> > > > > > like
> > > > > > > > > > "$root_dir/task-$taskNumber/". At the initialization
> phase,
> > > > > subtask
> > > > > > > > > deletes
> > > > > > > > > > and re-creates the temporary directory.
> > > > > > > > > >
> > > > > > > > > > However in the speculative execution scenario, it does
> not
> > > work
> > > > > > > because
> > > > > > > > > > there might be several subtasks running at the same time.
> > > These
> > > > > > > > subtasks
> > > > > > > > > > might delete, re-create and write the same temporary
> > > directory
> > > > at
> > > > > > the
> > > > > > > > > > same time. The correct temporary directory should be like
> > > > > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So
> > it's
> > > > > > > necessary
> > > > > > > > to
> > > > > > > > > > expose the attempt number to the Sink implementation to
> do
> > > the
> > > > > data
> > > > > > > > > > isolation.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > To @Lijie,
> > > > > > > > > >
> > > > > > > > > > > I have a question about this: does SinkV2 need to do
> the
> > > same
> > > > > > > thing?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Actually, yes.
> > > > > > > > > >
> > > > > > > > > > Should we/users do it in the committer? If yes, how does
> > the
> > > > > > commiter
> > > > > > > > > know
> > > > > > > > > > > which one is the right subtask attempt?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes, we/users should do it in the committer.
> > > > > > > > > >
> > > > > > > > > > In the current design, the Committer of Sink V2 should
> get
> > > the
> > > > > > "which
> > > > > > > > one
> > > > > > > > > > is the right subtask attempt" information from the
> > > "committable
> > > > > > > data''
> > > > > > > > > > produced by SinkWriter. Let's take the FileSink as
> example,
> > > the
> > > > > > > > > > "committable data" sent to the Committer contains the
> full
> > > path
> > > > > of
> > > > > > > the
> > > > > > > > > > files produced by SinkWriter. Users could also pass the
> > > attempt
> > > > > > > number
> > > > > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > > > > >
> > > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> > > > leaked
> > > > > > data
> > > > > > > > of
> > > > > > > > > > Sink V2" section of the FLIP document, we discussed some
> of
> > > the
> > > > > > > reasons
> > > > > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > > > > >
> > > > > > > > > > To @Jing Zhang
> > > > > > > > > >
> > > > > > > > > > I have a question about this: Speculative execution of
> > > > Committer
> > > > > > will
> > > > > > > > be
> > > > > > > > > > > disabled.
> > > > > > > > > >
> > > > > > > > > > I agree with your point and I saw the similar
> requirements
> > to
> > > > > > disable
> > > > > > > > > > speculative
> > > > > > > > > > > execution for specified operators.
> > > > > > > > > >
> > > > > > > > > > However the requirement is not supported currently. I
> think
> > > > there
> > > > > > > > > > should be some
> > > > > > > > > > > place to describe how to support it.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > In this FLIP design, the speculative execution of
> Committer
> > > of
> > > > > Sink
> > > > > > > V2
> > > > > > > > > will
> > > > > > > > > > be disabled by Flink. It's not an optional operation.
> Users
> > > can
> > > > > not
> > > > > > > > > change
> > > > > > > > > > it.
> > > > > > > > > > And as you said, "disable speculative execution for
> > specified
> > > > > > > > operators"
> > > > > > > > > is
> > > > > > > > > > not supported in the FLIP. Because it's a bit out of
> scope:
> > > > "Sink
> > > > > > > > > Supports
> > > > > > > > > > Speculative Execution For Batch Job". I think it's better
> > to
> > > > > start
> > > > > > > > > another
> > > > > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> > > > speculative
> > > > > > > > > execution
> > > > > > > > > > for operators" can be the title of that FLIP. And we can
> > > > discuss
> > > > > > > there
> > > > > > > > > how
> > > > > > > > > > to enable or disable speculative execution for specified
> > > > > operators
> > > > > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <
> > > beyond1920@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Biao,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for driving this FLIP. It's meaningful to
> support
> > > > > > > speculative
> > > > > > > > > > > execution
> > > > > > > > > > > of sinks is important.
> > > > > > > > > > >
> > > > > > > > > > > I have a question about this: Speculative execution of
> > > > > Committer
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > disabled.
> > > > > > > > > > >
> > > > > > > > > > > I agree with your point and I saw the similar
> > requirements
> > > to
> > > > > > > disable
> > > > > > > > > > > speculative execution for specified operators.
> > > > > > > > > > >
> > > > > > > > > > > However the requirement is not supported currently. I
> > think
> > > > > there
> > > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > some place to describe how to support it.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Jing Zhang
> > > > > > > > > > >
> > > > > > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二
> > > > 18:51写道:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Biao,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > In this FLIP, it introduces "int
> getFinishedAttempt(int
> > > > > > > > > subtaskIndex)"
> > > > > > > > > > > for
> > > > > > > > > > > > OutputFormat to know which subtask attempt is the one
> > > > marked
> > > > > as
> > > > > > > > > > finished
> > > > > > > > > > > by
> > > > > > > > > > > > JM and commit the right data.
> > > > > > > > > > > > I have a question about this: does SinkV2 need to do
> > the
> > > > same
> > > > > > > > thing?
> > > > > > > > > > > Should
> > > > > > > > > > > > we/users do it in the committer? If yes, how does the
> > > > > commiter
> > > > > > > know
> > > > > > > > > > which
> > > > > > > > > > > > one is the right subtask attempt?
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Lijie
> > > > > > > > > > > >
> > > > > > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> > > > 10:01写道:
> > > > > > > > > > > >
> > > > > > > > > > > > > HI, Biao.
> > > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > > After quick look of this FLIP, I have a question
> > about
> > > > > > "expose
> > > > > > > > the
> > > > > > > > > > > > attempt
> > > > > > > > > > > > > number which can be used to isolate data produced
> by
> > > > > > > speculative
> > > > > > > > > > > > executions
> > > > > > > > > > > > > with the same subtask id".
> > > > > > > > > > > > > What the sink expect to do to isolate data produced
> > by
> > > > > > > > speculative
> > > > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > > > generate a
> > > > > > new
> > > > > > > > > > > attempt.
> > > > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > Yuxia
> > > > > > > > > > > > >
> > > > > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> > > > Execution
> > > > > > For
> > > > > > > > > Batch
> > > > > > > > > > > Job
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I would like to start a discussion about making
> Sink
> > > > > support
> > > > > > > > > > > speculative
> > > > > > > > > > > > > execution for batch jobs. This proposal is a follow
> > up
> > > of
> > > > > > > > > "FLIP-168:
> > > > > > > > > > > > > Speculative Execution For Batch Job"[1].
> Speculative
> > > > > > execution
> > > > > > > is
> > > > > > > > > > very
> > > > > > > > > > > > > meaningful for batch jobs. And it would be more
> > > complete
> > > > > > after
> > > > > > > > > > > supporting
> > > > > > > > > > > > > speculative execution of Sink. Please find more
> > details
> > > > in
> > > > > > the
> > > > > > > > FLIP
> > > > > > > > > > > > > document
> > > > > > > > > > > > > [2].
> > > > > > > > > > > > >
> > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > [1]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > > > > [2]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi,

I think it will be confusing for users that the older API is deprecated but
the related new API is not graduated yet which ends up with the awkward
situation that none proper API could be used in production. In our case,
e.g. the sinkv2.Sink should be marked as @public before the SinkFunction
can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not
sure if the Flink community already has a similar rule. If not, a new one
should be defined.

Best regards,
Jing

On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Biao,
>
> Just to clarify, I understand your point of view and will not block your
> FLIP :)
>
> It would indeed be great to achieve that both the SinkFunction and
> SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed
> out.
>
> Best regards,
>
> Martijn
>
> Op wo 18 jan. 2023 om 09:34 schreef Yun Tang <my...@live.com>:
>
> > Hi Biao,
> >
> > I think it's time to deprecate the SinkFunction and it would be fine if
> > you could drive to launch the discussion.
> >
> > BTW, we might make it done in flink-1.17 release with deprecating
> > SourceFunction[1] together.
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-28045
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Biao Liu <mm...@gmail.com>
> > Sent: Wednesday, January 18, 2023 16:15
> > To: dev@flink.apache.org <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > Batch Job
> >
> > Hi Martijn & Jing,
> >
> > Thanks for feedback!
> >
> > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed
> out,
> > SinkFunction is still marked as public. Technically, according to the
> > Flink Bylaws[1],
> > the decision should be approved through an official voting. Although many
> > of the community maintainers (including me) thought it should be
> > deprecated, we still should not assume it has been the fact. Considering
> > the discussion and voting may last 1 or 2 weeks and it may last longer
> > if someone has an objection. I'd like to keep pushing the FLIP-281
> forward
> > with current design. I hope it can catch up with the release of 1.17.
> >
> > By the way, if nobody drives the deprecating thing, I would like to start
> > another discussion to talk about it. What do you think?
> >
> > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Fri, 13 Jan 2023 at 08:43, Jing Ge <ji...@ververica.com.invalid>
> wrote:
> >
> > > Hi Biao,
> > >
> > > Thanks for driving this. Like Martijn already pointed out. We will
> spend
> > > effort to remove SinkFunction after we deprecate it. The more
> > > functionality added into it, the bigger effort we will have to
> deprecate
> > > and remove the SinkFunction. Commonly, It is not recommended to add new
> > > features into an interface which we already decided to deprecate but do
> > not
> > > do yet. But, this FLIP is a special case and there are some reasons
> that
> > > lead us to support this proposal.
> > >
> > > First, the FLIP offered an equivalent solution for the new SinkV2,
> which
> > > means the migration from SinkFunction to SinkV2 for this feature is
> > > predictable and acceptable. The concern I raised above has been solved.
> > >
> > > Second, since the SinkFunction is still marked as public now [1], it
> > should
> > > be fine to add new features into it (follow the rules), especially if
> the
> > > requirement is urgent. Similar to [2] described for API graduation, it
> > > should also take 8 months (two release cycles, ideal case is 8 months,
> > > could be longer) to go from @Public to @Deprecated and to be removed.
> > > Additionally, considering the SinkFunction is one core function whose
> > > deletion will trigger a lot of further downstream deletions. The
> duration
> > > will be increased to be 16 months (again, idea case) or even longer,
> > e.g. 2
> > > years.
> > >
> > > Third, the SinkV2 is still marked as @PublicEvolving, which means a few
> > > more months (8 months?) in addition before we can start the deprecation
> > of
> > > SinkFunction. It is not rational to say no features should be added
> into
> > > SinkFunction during the upcoming 2 or 3 years.
> > >
> > > After thinking about all these aspects, I would support this FLIP, so
> +1
> > >
> > > This discussion leads us to another issue: we should graduate SinkV2
> > > and deprecate and remove SinkFunction asap. The longer we keep
> > > the SinkFunktion in the code base, the bigger effort we will have while
> > > working on anything that might depend on sink or has impact on sink.
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> > >
> > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <
> martijnvisser@apache.org
> > >
> > > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > While I rather wouldn't add new features to (to-be) deprecated
> > features,
> > > I
> > > > would be +0 for this.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
> > > >
> > > > > Hi Martijn,
> > > > >
> > > > > Thanks for your feedback!
> > > > >
> > > > > Yes, we propose to support speculative execution for SinkFunction.
> > > > > 1. From the perspective of compatibility, SinkFunction is the most
> > > > original
> > > > > Sink implementation.There are lots of implementations based on
> > > > > SinkFunction, not only in Flink official codebase but also in
> user's
> > > > > private codebase. It's a more serious issue than Sink V1. Of course
> > we
> > > > hope
> > > > > users could migrate the legacy implementation to the new interface.
> > > > However
> > > > > migration is always hard.
> > > > > 2. From the perspective of cost, we don't need to do much extra
> work
> > to
> > > > > support speculative execution for SinkFunction. All we need to do
> is
> > > > check
> > > > > whether the SinkFunction implementation
> > > > > inherits SupportsConcurrentExecutionAttempts or not. The other
> parts
> > of
> > > > > work are the same with Sink V2.
> > > > >
> > > > > To summarize, it's cheap to support speculative execution for
> > > > SinkFunction.
> > > > > And it may allow more existing scenarios to run with speculative
> > > > execution.
> > > > >
> > > > > Thanks,
> > > > > Biao /'bɪ.aʊ/
> > > > >
> > > > >
> > > > >
> > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <
> > martijnvisser@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Apologies for the late jumping in. My only question is about
> > > > > SinkFunction,
> > > > > > does this imply that we want to add support for this to the
> > > > SinkFunction?
> > > > > > If so, I would not be in favour of that since we would like to
> > > > deprecate
> > > > > (I
> > > > > > actually thought that was already the case) the SinkFunction in
> > > favour
> > > > of
> > > > > > SinkV2.
> > > > > >
> > > > > > Besides that, I have no other comments.
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Martijn
> > > > > >
> > > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for explanation.
> > > > > > >
> > > > > > > +1 for the proposal.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jing Zhang
> > > > > > >
> > > > > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for the explanation of how SinkV2  knows the right
> > subtask
> > > > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Lijie
> > > > > > > >
> > > > > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > > > >
> > > > > > > > > Thanks for all your feedback!
> > > > > > > > >
> > > > > > > > > To @Yuxia,
> > > > > > > > >
> > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > speculative
> > > > > > > > > > executions?  IIUC, if the taks failover, it also
> generate a
> > > new
> > > > > > > > attempt.
> > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes there is something different from the task failover
> > > scenario.
> > > > > The
> > > > > > > > > attempt number is more necessary for speculative execution
> > than
> > > > > > > failover.
> > > > > > > > > Because there can be only one subtask instance running at
> the
> > > > same
> > > > > > time
> > > > > > > > in
> > > > > > > > > the failover scenario.
> > > > > > > > >
> > > > > > > > > Let's take FileSystemOutputFormat as an example. For the
> > > failover
> > > > > > > > scenario,
> > > > > > > > > the temporary directory to store produced data can be
> > something
> > > > > like
> > > > > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> > > > subtask
> > > > > > > > deletes
> > > > > > > > > and re-creates the temporary directory.
> > > > > > > > >
> > > > > > > > > However in the speculative execution scenario, it does not
> > work
> > > > > > because
> > > > > > > > > there might be several subtasks running at the same time.
> > These
> > > > > > > subtasks
> > > > > > > > > might delete, re-create and write the same temporary
> > directory
> > > at
> > > > > the
> > > > > > > > > same time. The correct temporary directory should be like
> > > > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So
> it's
> > > > > > necessary
> > > > > > > to
> > > > > > > > > expose the attempt number to the Sink implementation to do
> > the
> > > > data
> > > > > > > > > isolation.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > To @Lijie,
> > > > > > > > >
> > > > > > > > > > I have a question about this: does SinkV2 need to do the
> > same
> > > > > > thing?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Actually, yes.
> > > > > > > > >
> > > > > > > > > Should we/users do it in the committer? If yes, how does
> the
> > > > > commiter
> > > > > > > > know
> > > > > > > > > > which one is the right subtask attempt?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, we/users should do it in the committer.
> > > > > > > > >
> > > > > > > > > In the current design, the Committer of Sink V2 should get
> > the
> > > > > "which
> > > > > > > one
> > > > > > > > > is the right subtask attempt" information from the
> > "committable
> > > > > > data''
> > > > > > > > > produced by SinkWriter. Let's take the FileSink as example,
> > the
> > > > > > > > > "committable data" sent to the Committer contains the full
> > path
> > > > of
> > > > > > the
> > > > > > > > > files produced by SinkWriter. Users could also pass the
> > attempt
> > > > > > number
> > > > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > > > >
> > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> > > leaked
> > > > > data
> > > > > > > of
> > > > > > > > > Sink V2" section of the FLIP document, we discussed some of
> > the
> > > > > > reasons
> > > > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > > > >
> > > > > > > > > To @Jing Zhang
> > > > > > > > >
> > > > > > > > > I have a question about this: Speculative execution of
> > > Committer
> > > > > will
> > > > > > > be
> > > > > > > > > > disabled.
> > > > > > > > >
> > > > > > > > > I agree with your point and I saw the similar requirements
> to
> > > > > disable
> > > > > > > > > speculative
> > > > > > > > > > execution for specified operators.
> > > > > > > > >
> > > > > > > > > However the requirement is not supported currently. I think
> > > there
> > > > > > > > > should be some
> > > > > > > > > > place to describe how to support it.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > In this FLIP design, the speculative execution of Committer
> > of
> > > > Sink
> > > > > > V2
> > > > > > > > will
> > > > > > > > > be disabled by Flink. It's not an optional operation. Users
> > can
> > > > not
> > > > > > > > change
> > > > > > > > > it.
> > > > > > > > > And as you said, "disable speculative execution for
> specified
> > > > > > > operators"
> > > > > > > > is
> > > > > > > > > not supported in the FLIP. Because it's a bit out of scope:
> > > "Sink
> > > > > > > > Supports
> > > > > > > > > Speculative Execution For Batch Job". I think it's better
> to
> > > > start
> > > > > > > > another
> > > > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> > > speculative
> > > > > > > > execution
> > > > > > > > > for operators" can be the title of that FLIP. And we can
> > > discuss
> > > > > > there
> > > > > > > > how
> > > > > > > > > to enable or disable speculative execution for specified
> > > > operators
> > > > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > > > >
> > > > > > > > > What do you think?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <
> > beyond1920@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Biao,
> > > > > > > > > >
> > > > > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > > > > speculative
> > > > > > > > > > execution
> > > > > > > > > > of sinks is important.
> > > > > > > > > >
> > > > > > > > > > I have a question about this: Speculative execution of
> > > > Committer
> > > > > > will
> > > > > > > > be
> > > > > > > > > > disabled.
> > > > > > > > > >
> > > > > > > > > > I agree with your point and I saw the similar
> requirements
> > to
> > > > > > disable
> > > > > > > > > > speculative execution for specified operators.
> > > > > > > > > >
> > > > > > > > > > However the requirement is not supported currently. I
> think
> > > > there
> > > > > > > > should
> > > > > > > > > be
> > > > > > > > > > some place to describe how to support it.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jing Zhang
> > > > > > > > > >
> > > > > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二
> > > 18:51写道:
> > > > > > > > > >
> > > > > > > > > > > Hi Biao,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > > > > subtaskIndex)"
> > > > > > > > > > for
> > > > > > > > > > > OutputFormat to know which subtask attempt is the one
> > > marked
> > > > as
> > > > > > > > > finished
> > > > > > > > > > by
> > > > > > > > > > > JM and commit the right data.
> > > > > > > > > > > I have a question about this: does SinkV2 need to do
> the
> > > same
> > > > > > > thing?
> > > > > > > > > > Should
> > > > > > > > > > > we/users do it in the committer? If yes, how does the
> > > > commiter
> > > > > > know
> > > > > > > > > which
> > > > > > > > > > > one is the right subtask attempt?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Lijie
> > > > > > > > > > >
> > > > > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> > > 10:01写道:
> > > > > > > > > > >
> > > > > > > > > > > > HI, Biao.
> > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > After quick look of this FLIP, I have a question
> about
> > > > > "expose
> > > > > > > the
> > > > > > > > > > > attempt
> > > > > > > > > > > > number which can be used to isolate data produced by
> > > > > > speculative
> > > > > > > > > > > executions
> > > > > > > > > > > > with the same subtask id".
> > > > > > > > > > > > What the sink expect to do to isolate data produced
> by
> > > > > > > speculative
> > > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > > generate a
> > > > > new
> > > > > > > > > > attempt.
> > > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > > > >
> > > > > > > > > > > > Best regards,
> > > > > > > > > > > > Yuxia
> > > > > > > > > > > >
> > > > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> > > Execution
> > > > > For
> > > > > > > > Batch
> > > > > > > > > > Job
> > > > > > > > > > > >
> > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > >
> > > > > > > > > > > > I would like to start a discussion about making Sink
> > > > support
> > > > > > > > > > speculative
> > > > > > > > > > > > execution for batch jobs. This proposal is a follow
> up
> > of
> > > > > > > > "FLIP-168:
> > > > > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > > > > execution
> > > > > > is
> > > > > > > > > very
> > > > > > > > > > > > meaningful for batch jobs. And it would be more
> > complete
> > > > > after
> > > > > > > > > > supporting
> > > > > > > > > > > > speculative execution of Sink. Please find more
> details
> > > in
> > > > > the
> > > > > > > FLIP
> > > > > > > > > > > > document
> > > > > > > > > > > > [2].
> > > > > > > > > > > >
> > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > > > [2]
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Martijn Visser <ma...@apache.org>.
Hi Biao,

Just to clarify, I understand your point of view and will not block your
FLIP :)

It would indeed be great to achieve that both the SinkFunction and
SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed
out.

Best regards,

Martijn

Op wo 18 jan. 2023 om 09:34 schreef Yun Tang <my...@live.com>:

> Hi Biao,
>
> I think it's time to deprecate the SinkFunction and it would be fine if
> you could drive to launch the discussion.
>
> BTW, we might make it done in flink-1.17 release with deprecating
> SourceFunction[1] together.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-28045
>
> Best
> Yun Tang
> ________________________________
> From: Biao Liu <mm...@gmail.com>
> Sent: Wednesday, January 18, 2023 16:15
> To: dev@flink.apache.org <de...@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> Batch Job
>
> Hi Martijn & Jing,
>
> Thanks for feedback!
>
> Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out,
> SinkFunction is still marked as public. Technically, according to the
> Flink Bylaws[1],
> the decision should be approved through an official voting. Although many
> of the community maintainers (including me) thought it should be
> deprecated, we still should not assume it has been the fact. Considering
> the discussion and voting may last 1 or 2 weeks and it may last longer
> if someone has an objection. I'd like to keep pushing the FLIP-281 forward
> with current design. I hope it can catch up with the release of 1.17.
>
> By the way, if nobody drives the deprecating thing, I would like to start
> another discussion to talk about it. What do you think?
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Fri, 13 Jan 2023 at 08:43, Jing Ge <ji...@ververica.com.invalid> wrote:
>
> > Hi Biao,
> >
> > Thanks for driving this. Like Martijn already pointed out. We will spend
> > effort to remove SinkFunction after we deprecate it. The more
> > functionality added into it, the bigger effort we will have to deprecate
> > and remove the SinkFunction. Commonly, It is not recommended to add new
> > features into an interface which we already decided to deprecate but do
> not
> > do yet. But, this FLIP is a special case and there are some reasons that
> > lead us to support this proposal.
> >
> > First, the FLIP offered an equivalent solution for the new SinkV2, which
> > means the migration from SinkFunction to SinkV2 for this feature is
> > predictable and acceptable. The concern I raised above has been solved.
> >
> > Second, since the SinkFunction is still marked as public now [1], it
> should
> > be fine to add new features into it (follow the rules), especially if the
> > requirement is urgent. Similar to [2] described for API graduation, it
> > should also take 8 months (two release cycles, ideal case is 8 months,
> > could be longer) to go from @Public to @Deprecated and to be removed.
> > Additionally, considering the SinkFunction is one core function whose
> > deletion will trigger a lot of further downstream deletions. The duration
> > will be increased to be 16 months (again, idea case) or even longer,
> e.g. 2
> > years.
> >
> > Third, the SinkV2 is still marked as @PublicEvolving, which means a few
> > more months (8 months?) in addition before we can start the deprecation
> of
> > SinkFunction. It is not rational to say no features should be added into
> > SinkFunction during the upcoming 2 or 3 years.
> >
> > After thinking about all these aspects, I would support this FLIP, so +1
> >
> > This discussion leads us to another issue: we should graduate SinkV2
> > and deprecate and remove SinkFunction asap. The longer we keep
> > the SinkFunktion in the code base, the bigger effort we will have while
> > working on anything that might depend on sink or has impact on sink.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> >
> > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <martijnvisser@apache.org
> >
> > wrote:
> >
> > > Hi Biao,
> > >
> > > While I rather wouldn't add new features to (to-be) deprecated
> features,
> > I
> > > would be +0 for this.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
> > >
> > > > Hi Martijn,
> > > >
> > > > Thanks for your feedback!
> > > >
> > > > Yes, we propose to support speculative execution for SinkFunction.
> > > > 1. From the perspective of compatibility, SinkFunction is the most
> > > original
> > > > Sink implementation.There are lots of implementations based on
> > > > SinkFunction, not only in Flink official codebase but also in user's
> > > > private codebase. It's a more serious issue than Sink V1. Of course
> we
> > > hope
> > > > users could migrate the legacy implementation to the new interface.
> > > However
> > > > migration is always hard.
> > > > 2. From the perspective of cost, we don't need to do much extra work
> to
> > > > support speculative execution for SinkFunction. All we need to do is
> > > check
> > > > whether the SinkFunction implementation
> > > > inherits SupportsConcurrentExecutionAttempts or not. The other parts
> of
> > > > work are the same with Sink V2.
> > > >
> > > > To summarize, it's cheap to support speculative execution for
> > > SinkFunction.
> > > > And it may allow more existing scenarios to run with speculative
> > > execution.
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > > >
> > > >
> > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <
> martijnvisser@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Apologies for the late jumping in. My only question is about
> > > > SinkFunction,
> > > > > does this imply that we want to add support for this to the
> > > SinkFunction?
> > > > > If so, I would not be in favour of that since we would like to
> > > deprecate
> > > > (I
> > > > > actually thought that was already the case) the SinkFunction in
> > favour
> > > of
> > > > > SinkV2.
> > > > >
> > > > > Besides that, I have no other comments.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for explanation.
> > > > > >
> > > > > > +1 for the proposal.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for the explanation of how SinkV2  knows the right
> subtask
> > > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > > >
> > > > > > > Best,
> > > > > > > Lijie
> > > > > > >
> > > > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > > >
> > > > > > > > Thanks for all your feedback!
> > > > > > > >
> > > > > > > > To @Yuxia,
> > > > > > > >
> > > > > > > > > What the sink expect to do to isolate data produced by
> > > > speculative
> > > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> > new
> > > > > > > attempt.
> > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > >
> > > > > > > >
> > > > > > > > Yes there is something different from the task failover
> > scenario.
> > > > The
> > > > > > > > attempt number is more necessary for speculative execution
> than
> > > > > > failover.
> > > > > > > > Because there can be only one subtask instance running at the
> > > same
> > > > > time
> > > > > > > in
> > > > > > > > the failover scenario.
> > > > > > > >
> > > > > > > > Let's take FileSystemOutputFormat as an example. For the
> > failover
> > > > > > > scenario,
> > > > > > > > the temporary directory to store produced data can be
> something
> > > > like
> > > > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> > > subtask
> > > > > > > deletes
> > > > > > > > and re-creates the temporary directory.
> > > > > > > >
> > > > > > > > However in the speculative execution scenario, it does not
> work
> > > > > because
> > > > > > > > there might be several subtasks running at the same time.
> These
> > > > > > subtasks
> > > > > > > > might delete, re-create and write the same temporary
> directory
> > at
> > > > the
> > > > > > > > same time. The correct temporary directory should be like
> > > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > > > necessary
> > > > > > to
> > > > > > > > expose the attempt number to the Sink implementation to do
> the
> > > data
> > > > > > > > isolation.
> > > > > > > >
> > > > > > > >
> > > > > > > > To @Lijie,
> > > > > > > >
> > > > > > > > > I have a question about this: does SinkV2 need to do the
> same
> > > > > thing?
> > > > > > > >
> > > > > > > >
> > > > > > > > Actually, yes.
> > > > > > > >
> > > > > > > > Should we/users do it in the committer? If yes, how does the
> > > > commiter
> > > > > > > know
> > > > > > > > > which one is the right subtask attempt?
> > > > > > > >
> > > > > > > >
> > > > > > > > Yes, we/users should do it in the committer.
> > > > > > > >
> > > > > > > > In the current design, the Committer of Sink V2 should get
> the
> > > > "which
> > > > > > one
> > > > > > > > is the right subtask attempt" information from the
> "committable
> > > > > data''
> > > > > > > > produced by SinkWriter. Let's take the FileSink as example,
> the
> > > > > > > > "committable data" sent to the Committer contains the full
> path
> > > of
> > > > > the
> > > > > > > > files produced by SinkWriter. Users could also pass the
> attempt
> > > > > number
> > > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > > >
> > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> > leaked
> > > > data
> > > > > > of
> > > > > > > > Sink V2" section of the FLIP document, we discussed some of
> the
> > > > > reasons
> > > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > > >
> > > > > > > > To @Jing Zhang
> > > > > > > >
> > > > > > > > I have a question about this: Speculative execution of
> > Committer
> > > > will
> > > > > > be
> > > > > > > > > disabled.
> > > > > > > >
> > > > > > > > I agree with your point and I saw the similar requirements to
> > > > disable
> > > > > > > > speculative
> > > > > > > > > execution for specified operators.
> > > > > > > >
> > > > > > > > However the requirement is not supported currently. I think
> > there
> > > > > > > > should be some
> > > > > > > > > place to describe how to support it.
> > > > > > > >
> > > > > > > >
> > > > > > > > In this FLIP design, the speculative execution of Committer
> of
> > > Sink
> > > > > V2
> > > > > > > will
> > > > > > > > be disabled by Flink. It's not an optional operation. Users
> can
> > > not
> > > > > > > change
> > > > > > > > it.
> > > > > > > > And as you said, "disable speculative execution for specified
> > > > > > operators"
> > > > > > > is
> > > > > > > > not supported in the FLIP. Because it's a bit out of scope:
> > "Sink
> > > > > > > Supports
> > > > > > > > Speculative Execution For Batch Job". I think it's better to
> > > start
> > > > > > > another
> > > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> > speculative
> > > > > > > execution
> > > > > > > > for operators" can be the title of that FLIP. And we can
> > discuss
> > > > > there
> > > > > > > how
> > > > > > > > to enable or disable speculative execution for specified
> > > operators
> > > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <
> beyond1920@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > > > speculative
> > > > > > > > > execution
> > > > > > > > > of sinks is important.
> > > > > > > > >
> > > > > > > > > I have a question about this: Speculative execution of
> > > Committer
> > > > > will
> > > > > > > be
> > > > > > > > > disabled.
> > > > > > > > >
> > > > > > > > > I agree with your point and I saw the similar requirements
> to
> > > > > disable
> > > > > > > > > speculative execution for specified operators.
> > > > > > > > >
> > > > > > > > > However the requirement is not supported currently. I think
> > > there
> > > > > > > should
> > > > > > > > be
> > > > > > > > > some place to describe how to support it.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jing Zhang
> > > > > > > > >
> > > > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二
> > 18:51写道:
> > > > > > > > >
> > > > > > > > > > Hi Biao,
> > > > > > > > > >
> > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > > > subtaskIndex)"
> > > > > > > > > for
> > > > > > > > > > OutputFormat to know which subtask attempt is the one
> > marked
> > > as
> > > > > > > > finished
> > > > > > > > > by
> > > > > > > > > > JM and commit the right data.
> > > > > > > > > > I have a question about this: does SinkV2 need to do the
> > same
> > > > > > thing?
> > > > > > > > > Should
> > > > > > > > > > we/users do it in the committer? If yes, how does the
> > > commiter
> > > > > know
> > > > > > > > which
> > > > > > > > > > one is the right subtask attempt?
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Lijie
> > > > > > > > > >
> > > > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> > 10:01写道:
> > > > > > > > > >
> > > > > > > > > > > HI, Biao.
> > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > After quick look of this FLIP, I have a question about
> > > > "expose
> > > > > > the
> > > > > > > > > > attempt
> > > > > > > > > > > number which can be used to isolate data produced by
> > > > > speculative
> > > > > > > > > > executions
> > > > > > > > > > > with the same subtask id".
> > > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > > speculative
> > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > generate a
> > > > new
> > > > > > > > > attempt.
> > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > > Yuxia
> > > > > > > > > > >
> > > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> > Execution
> > > > For
> > > > > > > Batch
> > > > > > > > > Job
> > > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start a discussion about making Sink
> > > support
> > > > > > > > > speculative
> > > > > > > > > > > execution for batch jobs. This proposal is a follow up
> of
> > > > > > > "FLIP-168:
> > > > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > > > execution
> > > > > is
> > > > > > > > very
> > > > > > > > > > > meaningful for batch jobs. And it would be more
> complete
> > > > after
> > > > > > > > > supporting
> > > > > > > > > > > speculative execution of Sink. Please find more details
> > in
> > > > the
> > > > > > FLIP
> > > > > > > > > > > document
> > > > > > > > > > > [2].
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > > [2]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Yun Tang <my...@live.com>.
Hi Biao,

I think it's time to deprecate the SinkFunction and it would be fine if you could drive to launch the discussion.

BTW, we might make it done in flink-1.17 release with deprecating SourceFunction[1] together.


[1] https://issues.apache.org/jira/browse/FLINK-28045

Best
Yun Tang
________________________________
From: Biao Liu <mm...@gmail.com>
Sent: Wednesday, January 18, 2023 16:15
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Hi Martijn & Jing,

Thanks for feedback!

Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out,
SinkFunction is still marked as public. Technically, according to the
Flink Bylaws[1],
the decision should be approved through an official voting. Although many
of the community maintainers (including me) thought it should be
deprecated, we still should not assume it has been the fact. Considering
the discussion and voting may last 1 or 2 weeks and it may last longer
if someone has an objection. I'd like to keep pushing the FLIP-281 forward
with current design. I hope it can catch up with the release of 1.17.

By the way, if nobody drives the deprecating thing, I would like to start
another discussion to talk about it. What do you think?

[1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws

Thanks,
Biao /'bɪ.aʊ/



On Fri, 13 Jan 2023 at 08:43, Jing Ge <ji...@ververica.com.invalid> wrote:

> Hi Biao,
>
> Thanks for driving this. Like Martijn already pointed out. We will spend
> effort to remove SinkFunction after we deprecate it. The more
> functionality added into it, the bigger effort we will have to deprecate
> and remove the SinkFunction. Commonly, It is not recommended to add new
> features into an interface which we already decided to deprecate but do not
> do yet. But, this FLIP is a special case and there are some reasons that
> lead us to support this proposal.
>
> First, the FLIP offered an equivalent solution for the new SinkV2, which
> means the migration from SinkFunction to SinkV2 for this feature is
> predictable and acceptable. The concern I raised above has been solved.
>
> Second, since the SinkFunction is still marked as public now [1], it should
> be fine to add new features into it (follow the rules), especially if the
> requirement is urgent. Similar to [2] described for API graduation, it
> should also take 8 months (two release cycles, ideal case is 8 months,
> could be longer) to go from @Public to @Deprecated and to be removed.
> Additionally, considering the SinkFunction is one core function whose
> deletion will trigger a lot of further downstream deletions. The duration
> will be increased to be 16 months (again, idea case) or even longer, e.g. 2
> years.
>
> Third, the SinkV2 is still marked as @PublicEvolving, which means a few
> more months (8 months?) in addition before we can start the deprecation of
> SinkFunction. It is not rational to say no features should be added into
> SinkFunction during the upcoming 2 or 3 years.
>
> After thinking about all these aspects, I would support this FLIP, so +1
>
> This discussion leads us to another issue: we should graduate SinkV2
> and deprecate and remove SinkFunction asap. The longer we keep
> the SinkFunktion in the code base, the bigger effort we will have while
> working on anything that might depend on sink or has impact on sink.
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
>
> On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > While I rather wouldn't add new features to (to-be) deprecated features,
> I
> > would be +0 for this.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
> >
> > > Hi Martijn,
> > >
> > > Thanks for your feedback!
> > >
> > > Yes, we propose to support speculative execution for SinkFunction.
> > > 1. From the perspective of compatibility, SinkFunction is the most
> > original
> > > Sink implementation.There are lots of implementations based on
> > > SinkFunction, not only in Flink official codebase but also in user's
> > > private codebase. It's a more serious issue than Sink V1. Of course we
> > hope
> > > users could migrate the legacy implementation to the new interface.
> > However
> > > migration is always hard.
> > > 2. From the perspective of cost, we don't need to do much extra work to
> > > support speculative execution for SinkFunction. All we need to do is
> > check
> > > whether the SinkFunction implementation
> > > inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> > > work are the same with Sink V2.
> > >
> > > To summarize, it's cheap to support speculative execution for
> > SinkFunction.
> > > And it may allow more existing scenarios to run with speculative
> > execution.
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvisser@apache.org
> >
> > > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Apologies for the late jumping in. My only question is about
> > > SinkFunction,
> > > > does this imply that we want to add support for this to the
> > SinkFunction?
> > > > If so, I would not be in favour of that since we would like to
> > deprecate
> > > (I
> > > > actually thought that was already the case) the SinkFunction in
> favour
> > of
> > > > SinkV2.
> > > >
> > > > Besides that, I have no other comments.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for explanation.
> > > > >
> > > > > +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > >
> > > > > > > Thanks for all your feedback!
> > > > > > >
> > > > > > > To @Yuxia,
> > > > > > >
> > > > > > > > What the sink expect to do to isolate data produced by
> > > speculative
> > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> new
> > > > > > attempt.
> > > > > > > > Does it make difference in isolating data produced?
> > > > > > >
> > > > > > >
> > > > > > > Yes there is something different from the task failover
> scenario.
> > > The
> > > > > > > attempt number is more necessary for speculative execution than
> > > > > failover.
> > > > > > > Because there can be only one subtask instance running at the
> > same
> > > > time
> > > > > > in
> > > > > > > the failover scenario.
> > > > > > >
> > > > > > > Let's take FileSystemOutputFormat as an example. For the
> failover
> > > > > > scenario,
> > > > > > > the temporary directory to store produced data can be something
> > > like
> > > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> > subtask
> > > > > > deletes
> > > > > > > and re-creates the temporary directory.
> > > > > > >
> > > > > > > However in the speculative execution scenario, it does not work
> > > > because
> > > > > > > there might be several subtasks running at the same time. These
> > > > > subtasks
> > > > > > > might delete, re-create and write the same temporary directory
> at
> > > the
> > > > > > > same time. The correct temporary directory should be like
> > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > > necessary
> > > > > to
> > > > > > > expose the attempt number to the Sink implementation to do the
> > data
> > > > > > > isolation.
> > > > > > >
> > > > > > >
> > > > > > > To @Lijie,
> > > > > > >
> > > > > > > > I have a question about this: does SinkV2 need to do the same
> > > > thing?
> > > > > > >
> > > > > > >
> > > > > > > Actually, yes.
> > > > > > >
> > > > > > > Should we/users do it in the committer? If yes, how does the
> > > commiter
> > > > > > know
> > > > > > > > which one is the right subtask attempt?
> > > > > > >
> > > > > > >
> > > > > > > Yes, we/users should do it in the committer.
> > > > > > >
> > > > > > > In the current design, the Committer of Sink V2 should get the
> > > "which
> > > > > one
> > > > > > > is the right subtask attempt" information from the "committable
> > > > data''
> > > > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > > > "committable data" sent to the Committer contains the full path
> > of
> > > > the
> > > > > > > files produced by SinkWriter. Users could also pass the attempt
> > > > number
> > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > >
> > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> leaked
> > > data
> > > > > of
> > > > > > > Sink V2" section of the FLIP document, we discussed some of the
> > > > reasons
> > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > >
> > > > > > > To @Jing Zhang
> > > > > > >
> > > > > > > I have a question about this: Speculative execution of
> Committer
> > > will
> > > > > be
> > > > > > > > disabled.
> > > > > > >
> > > > > > > I agree with your point and I saw the similar requirements to
> > > disable
> > > > > > > speculative
> > > > > > > > execution for specified operators.
> > > > > > >
> > > > > > > However the requirement is not supported currently. I think
> there
> > > > > > > should be some
> > > > > > > > place to describe how to support it.
> > > > > > >
> > > > > > >
> > > > > > > In this FLIP design, the speculative execution of Committer of
> > Sink
> > > > V2
> > > > > > will
> > > > > > > be disabled by Flink. It's not an optional operation. Users can
> > not
> > > > > > change
> > > > > > > it.
> > > > > > > And as you said, "disable speculative execution for specified
> > > > > operators"
> > > > > > is
> > > > > > > not supported in the FLIP. Because it's a bit out of scope:
> "Sink
> > > > > > Supports
> > > > > > > Speculative Execution For Batch Job". I think it's better to
> > start
> > > > > > another
> > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> speculative
> > > > > > execution
> > > > > > > for operators" can be the title of that FLIP. And we can
> discuss
> > > > there
> > > > > > how
> > > > > > > to enable or disable speculative execution for specified
> > operators
> > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Biao /'bɪ.aʊ/
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1920@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > > speculative
> > > > > > > > execution
> > > > > > > > of sinks is important.
> > > > > > > >
> > > > > > > > I have a question about this: Speculative execution of
> > Committer
> > > > will
> > > > > > be
> > > > > > > > disabled.
> > > > > > > >
> > > > > > > > I agree with your point and I saw the similar requirements to
> > > > disable
> > > > > > > > speculative execution for specified operators.
> > > > > > > >
> > > > > > > > However the requirement is not supported currently. I think
> > there
> > > > > > should
> > > > > > > be
> > > > > > > > some place to describe how to support it.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jing Zhang
> > > > > > > >
> > > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二
> 18:51写道:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > > subtaskIndex)"
> > > > > > > > for
> > > > > > > > > OutputFormat to know which subtask attempt is the one
> marked
> > as
> > > > > > > finished
> > > > > > > > by
> > > > > > > > > JM and commit the right data.
> > > > > > > > > I have a question about this: does SinkV2 need to do the
> same
> > > > > thing?
> > > > > > > > Should
> > > > > > > > > we/users do it in the committer? If yes, how does the
> > commiter
> > > > know
> > > > > > > which
> > > > > > > > > one is the right subtask attempt?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Lijie
> > > > > > > > >
> > > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> 10:01写道:
> > > > > > > > >
> > > > > > > > > > HI, Biao.
> > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > After quick look of this FLIP, I have a question about
> > > "expose
> > > > > the
> > > > > > > > > attempt
> > > > > > > > > > number which can be used to isolate data produced by
> > > > speculative
> > > > > > > > > executions
> > > > > > > > > > with the same subtask id".
> > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > speculative
> > > > > > > > > > executions?  IIUC, if the taks failover, it also
> generate a
> > > new
> > > > > > > > attempt.
> > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Yuxia
> > > > > > > > > >
> > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> Execution
> > > For
> > > > > > Batch
> > > > > > > > Job
> > > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I would like to start a discussion about making Sink
> > support
> > > > > > > > speculative
> > > > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > > > "FLIP-168:
> > > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > > execution
> > > > is
> > > > > > > very
> > > > > > > > > > meaningful for batch jobs. And it would be more complete
> > > after
> > > > > > > > supporting
> > > > > > > > > > speculative execution of Sink. Please find more details
> in
> > > the
> > > > > FLIP
> > > > > > > > > > document
> > > > > > > > > > [2].
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Biao Liu <mm...@gmail.com>.
Hi Martijn & Jing,

Thanks for feedback!

Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out,
SinkFunction is still marked as public. Technically, according to the
Flink Bylaws[1],
the decision should be approved through an official voting. Although many
of the community maintainers (including me) thought it should be
deprecated, we still should not assume it has been the fact. Considering
the discussion and voting may last 1 or 2 weeks and it may last longer
if someone has an objection. I'd like to keep pushing the FLIP-281 forward
with current design. I hope it can catch up with the release of 1.17.

By the way, if nobody drives the deprecating thing, I would like to start
another discussion to talk about it. What do you think?

[1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws

Thanks,
Biao /'bɪ.aʊ/



On Fri, 13 Jan 2023 at 08:43, Jing Ge <ji...@ververica.com.invalid> wrote:

> Hi Biao,
>
> Thanks for driving this. Like Martijn already pointed out. We will spend
> effort to remove SinkFunction after we deprecate it. The more
> functionality added into it, the bigger effort we will have to deprecate
> and remove the SinkFunction. Commonly, It is not recommended to add new
> features into an interface which we already decided to deprecate but do not
> do yet. But, this FLIP is a special case and there are some reasons that
> lead us to support this proposal.
>
> First, the FLIP offered an equivalent solution for the new SinkV2, which
> means the migration from SinkFunction to SinkV2 for this feature is
> predictable and acceptable. The concern I raised above has been solved.
>
> Second, since the SinkFunction is still marked as public now [1], it should
> be fine to add new features into it (follow the rules), especially if the
> requirement is urgent. Similar to [2] described for API graduation, it
> should also take 8 months (two release cycles, ideal case is 8 months,
> could be longer) to go from @Public to @Deprecated and to be removed.
> Additionally, considering the SinkFunction is one core function whose
> deletion will trigger a lot of further downstream deletions. The duration
> will be increased to be 16 months (again, idea case) or even longer, e.g. 2
> years.
>
> Third, the SinkV2 is still marked as @PublicEvolving, which means a few
> more months (8 months?) in addition before we can start the deprecation of
> SinkFunction. It is not rational to say no features should be added into
> SinkFunction during the upcoming 2 or 3 years.
>
> After thinking about all these aspects, I would support this FLIP, so +1
>
> This discussion leads us to another issue: we should graduate SinkV2
> and deprecate and remove SinkFunction asap. The longer we keep
> the SinkFunktion in the code base, the bigger effort we will have while
> working on anything that might depend on sink or has impact on sink.
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
>
> On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > While I rather wouldn't add new features to (to-be) deprecated features,
> I
> > would be +0 for this.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
> >
> > > Hi Martijn,
> > >
> > > Thanks for your feedback!
> > >
> > > Yes, we propose to support speculative execution for SinkFunction.
> > > 1. From the perspective of compatibility, SinkFunction is the most
> > original
> > > Sink implementation.There are lots of implementations based on
> > > SinkFunction, not only in Flink official codebase but also in user's
> > > private codebase. It's a more serious issue than Sink V1. Of course we
> > hope
> > > users could migrate the legacy implementation to the new interface.
> > However
> > > migration is always hard.
> > > 2. From the perspective of cost, we don't need to do much extra work to
> > > support speculative execution for SinkFunction. All we need to do is
> > check
> > > whether the SinkFunction implementation
> > > inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> > > work are the same with Sink V2.
> > >
> > > To summarize, it's cheap to support speculative execution for
> > SinkFunction.
> > > And it may allow more existing scenarios to run with speculative
> > execution.
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvisser@apache.org
> >
> > > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Apologies for the late jumping in. My only question is about
> > > SinkFunction,
> > > > does this imply that we want to add support for this to the
> > SinkFunction?
> > > > If so, I would not be in favour of that since we would like to
> > deprecate
> > > (I
> > > > actually thought that was already the case) the SinkFunction in
> favour
> > of
> > > > SinkV2.
> > > >
> > > > Besides that, I have no other comments.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for explanation.
> > > > >
> > > > > +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > >
> > > > > > > Thanks for all your feedback!
> > > > > > >
> > > > > > > To @Yuxia,
> > > > > > >
> > > > > > > > What the sink expect to do to isolate data produced by
> > > speculative
> > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> new
> > > > > > attempt.
> > > > > > > > Does it make difference in isolating data produced?
> > > > > > >
> > > > > > >
> > > > > > > Yes there is something different from the task failover
> scenario.
> > > The
> > > > > > > attempt number is more necessary for speculative execution than
> > > > > failover.
> > > > > > > Because there can be only one subtask instance running at the
> > same
> > > > time
> > > > > > in
> > > > > > > the failover scenario.
> > > > > > >
> > > > > > > Let's take FileSystemOutputFormat as an example. For the
> failover
> > > > > > scenario,
> > > > > > > the temporary directory to store produced data can be something
> > > like
> > > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> > subtask
> > > > > > deletes
> > > > > > > and re-creates the temporary directory.
> > > > > > >
> > > > > > > However in the speculative execution scenario, it does not work
> > > > because
> > > > > > > there might be several subtasks running at the same time. These
> > > > > subtasks
> > > > > > > might delete, re-create and write the same temporary directory
> at
> > > the
> > > > > > > same time. The correct temporary directory should be like
> > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > > necessary
> > > > > to
> > > > > > > expose the attempt number to the Sink implementation to do the
> > data
> > > > > > > isolation.
> > > > > > >
> > > > > > >
> > > > > > > To @Lijie,
> > > > > > >
> > > > > > > > I have a question about this: does SinkV2 need to do the same
> > > > thing?
> > > > > > >
> > > > > > >
> > > > > > > Actually, yes.
> > > > > > >
> > > > > > > Should we/users do it in the committer? If yes, how does the
> > > commiter
> > > > > > know
> > > > > > > > which one is the right subtask attempt?
> > > > > > >
> > > > > > >
> > > > > > > Yes, we/users should do it in the committer.
> > > > > > >
> > > > > > > In the current design, the Committer of Sink V2 should get the
> > > "which
> > > > > one
> > > > > > > is the right subtask attempt" information from the "committable
> > > > data''
> > > > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > > > "committable data" sent to the Committer contains the full path
> > of
> > > > the
> > > > > > > files produced by SinkWriter. Users could also pass the attempt
> > > > number
> > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > >
> > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> leaked
> > > data
> > > > > of
> > > > > > > Sink V2" section of the FLIP document, we discussed some of the
> > > > reasons
> > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > >
> > > > > > > To @Jing Zhang
> > > > > > >
> > > > > > > I have a question about this: Speculative execution of
> Committer
> > > will
> > > > > be
> > > > > > > > disabled.
> > > > > > >
> > > > > > > I agree with your point and I saw the similar requirements to
> > > disable
> > > > > > > speculative
> > > > > > > > execution for specified operators.
> > > > > > >
> > > > > > > However the requirement is not supported currently. I think
> there
> > > > > > > should be some
> > > > > > > > place to describe how to support it.
> > > > > > >
> > > > > > >
> > > > > > > In this FLIP design, the speculative execution of Committer of
> > Sink
> > > > V2
> > > > > > will
> > > > > > > be disabled by Flink. It's not an optional operation. Users can
> > not
> > > > > > change
> > > > > > > it.
> > > > > > > And as you said, "disable speculative execution for specified
> > > > > operators"
> > > > > > is
> > > > > > > not supported in the FLIP. Because it's a bit out of scope:
> "Sink
> > > > > > Supports
> > > > > > > Speculative Execution For Batch Job". I think it's better to
> > start
> > > > > > another
> > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> speculative
> > > > > > execution
> > > > > > > for operators" can be the title of that FLIP. And we can
> discuss
> > > > there
> > > > > > how
> > > > > > > to enable or disable speculative execution for specified
> > operators
> > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Biao /'bɪ.aʊ/
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1920@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > > speculative
> > > > > > > > execution
> > > > > > > > of sinks is important.
> > > > > > > >
> > > > > > > > I have a question about this: Speculative execution of
> > Committer
> > > > will
> > > > > > be
> > > > > > > > disabled.
> > > > > > > >
> > > > > > > > I agree with your point and I saw the similar requirements to
> > > > disable
> > > > > > > > speculative execution for specified operators.
> > > > > > > >
> > > > > > > > However the requirement is not supported currently. I think
> > there
> > > > > > should
> > > > > > > be
> > > > > > > > some place to describe how to support it.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jing Zhang
> > > > > > > >
> > > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二
> 18:51写道:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > > subtaskIndex)"
> > > > > > > > for
> > > > > > > > > OutputFormat to know which subtask attempt is the one
> marked
> > as
> > > > > > > finished
> > > > > > > > by
> > > > > > > > > JM and commit the right data.
> > > > > > > > > I have a question about this: does SinkV2 need to do the
> same
> > > > > thing?
> > > > > > > > Should
> > > > > > > > > we/users do it in the committer? If yes, how does the
> > commiter
> > > > know
> > > > > > > which
> > > > > > > > > one is the right subtask attempt?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Lijie
> > > > > > > > >
> > > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> 10:01写道:
> > > > > > > > >
> > > > > > > > > > HI, Biao.
> > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > After quick look of this FLIP, I have a question about
> > > "expose
> > > > > the
> > > > > > > > > attempt
> > > > > > > > > > number which can be used to isolate data produced by
> > > > speculative
> > > > > > > > > executions
> > > > > > > > > > with the same subtask id".
> > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > speculative
> > > > > > > > > > executions?  IIUC, if the taks failover, it also
> generate a
> > > new
> > > > > > > > attempt.
> > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Yuxia
> > > > > > > > > >
> > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> Execution
> > > For
> > > > > > Batch
> > > > > > > > Job
> > > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I would like to start a discussion about making Sink
> > support
> > > > > > > > speculative
> > > > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > > > "FLIP-168:
> > > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > > execution
> > > > is
> > > > > > > very
> > > > > > > > > > meaningful for batch jobs. And it would be more complete
> > > after
> > > > > > > > supporting
> > > > > > > > > > speculative execution of Sink. Please find more details
> in
> > > the
> > > > > FLIP
> > > > > > > > > > document
> > > > > > > > > > [2].
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Biao,

Thanks for driving this. Like Martijn already pointed out. We will spend
effort to remove SinkFunction after we deprecate it. The more
functionality added into it, the bigger effort we will have to deprecate
and remove the SinkFunction. Commonly, It is not recommended to add new
features into an interface which we already decided to deprecate but do not
do yet. But, this FLIP is a special case and there are some reasons that
lead us to support this proposal.

First, the FLIP offered an equivalent solution for the new SinkV2, which
means the migration from SinkFunction to SinkV2 for this feature is
predictable and acceptable. The concern I raised above has been solved.

Second, since the SinkFunction is still marked as public now [1], it should
be fine to add new features into it (follow the rules), especially if the
requirement is urgent. Similar to [2] described for API graduation, it
should also take 8 months (two release cycles, ideal case is 8 months,
could be longer) to go from @Public to @Deprecated and to be removed.
Additionally, considering the SinkFunction is one core function whose
deletion will trigger a lot of further downstream deletions. The duration
will be increased to be 16 months (again, idea case) or even longer, e.g. 2
years.

Third, the SinkV2 is still marked as @PublicEvolving, which means a few
more months (8 months?) in addition before we can start the deprecation of
SinkFunction. It is not rational to say no features should be added into
SinkFunction during the upcoming 2 or 3 years.

After thinking about all these aspects, I would support this FLIP, so +1

This discussion leads us to another issue: we should graduate SinkV2
and deprecate and remove SinkFunction asap. The longer we keep
the SinkFunktion in the code base, the bigger effort we will have while
working on anything that might depend on sink or has impact on sink.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process

On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Biao,
>
> While I rather wouldn't add new features to (to-be) deprecated features, I
> would be +0 for this.
>
> Best regards,
>
> Martijn
>
> Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:
>
> > Hi Martijn,
> >
> > Thanks for your feedback!
> >
> > Yes, we propose to support speculative execution for SinkFunction.
> > 1. From the perspective of compatibility, SinkFunction is the most
> original
> > Sink implementation.There are lots of implementations based on
> > SinkFunction, not only in Flink official codebase but also in user's
> > private codebase. It's a more serious issue than Sink V1. Of course we
> hope
> > users could migrate the legacy implementation to the new interface.
> However
> > migration is always hard.
> > 2. From the perspective of cost, we don't need to do much extra work to
> > support speculative execution for SinkFunction. All we need to do is
> check
> > whether the SinkFunction implementation
> > inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> > work are the same with Sink V2.
> >
> > To summarize, it's cheap to support speculative execution for
> SinkFunction.
> > And it may allow more existing scenarios to run with speculative
> execution.
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <ma...@apache.org>
> > wrote:
> >
> > > Hi Biao,
> > >
> > > Apologies for the late jumping in. My only question is about
> > SinkFunction,
> > > does this imply that we want to add support for this to the
> SinkFunction?
> > > If so, I would not be in favour of that since we would like to
> deprecate
> > (I
> > > actually thought that was already the case) the SinkFunction in favour
> of
> > > SinkV2.
> > >
> > > Besides that, I have no other comments.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com>
> wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for explanation.
> > > >
> > > > +1 for the proposal.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > > attempt. I have no more questions, +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > >
> > > > > > Thanks for all your feedback!
> > > > > >
> > > > > > To @Yuxia,
> > > > > >
> > > > > > > What the sink expect to do to isolate data produced by
> > speculative
> > > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > > attempt.
> > > > > > > Does it make difference in isolating data produced?
> > > > > >
> > > > > >
> > > > > > Yes there is something different from the task failover scenario.
> > The
> > > > > > attempt number is more necessary for speculative execution than
> > > > failover.
> > > > > > Because there can be only one subtask instance running at the
> same
> > > time
> > > > > in
> > > > > > the failover scenario.
> > > > > >
> > > > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > > > scenario,
> > > > > > the temporary directory to store produced data can be something
> > like
> > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> subtask
> > > > > deletes
> > > > > > and re-creates the temporary directory.
> > > > > >
> > > > > > However in the speculative execution scenario, it does not work
> > > because
> > > > > > there might be several subtasks running at the same time. These
> > > > subtasks
> > > > > > might delete, re-create and write the same temporary directory at
> > the
> > > > > > same time. The correct temporary directory should be like
> > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > necessary
> > > > to
> > > > > > expose the attempt number to the Sink implementation to do the
> data
> > > > > > isolation.
> > > > > >
> > > > > >
> > > > > > To @Lijie,
> > > > > >
> > > > > > > I have a question about this: does SinkV2 need to do the same
> > > thing?
> > > > > >
> > > > > >
> > > > > > Actually, yes.
> > > > > >
> > > > > > Should we/users do it in the committer? If yes, how does the
> > commiter
> > > > > know
> > > > > > > which one is the right subtask attempt?
> > > > > >
> > > > > >
> > > > > > Yes, we/users should do it in the committer.
> > > > > >
> > > > > > In the current design, the Committer of Sink V2 should get the
> > "which
> > > > one
> > > > > > is the right subtask attempt" information from the "committable
> > > data''
> > > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > > "committable data" sent to the Committer contains the full path
> of
> > > the
> > > > > > files produced by SinkWriter. Users could also pass the attempt
> > > number
> > > > > > through "committable data" from SinkWriter to Committer.
> > > > > >
> > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked
> > data
> > > > of
> > > > > > Sink V2" section of the FLIP document, we discussed some of the
> > > reasons
> > > > > > that we didn't provide the API like OutputFormat.
> > > > > >
> > > > > > To @Jing Zhang
> > > > > >
> > > > > > I have a question about this: Speculative execution of Committer
> > will
> > > > be
> > > > > > > disabled.
> > > > > >
> > > > > > I agree with your point and I saw the similar requirements to
> > disable
> > > > > > speculative
> > > > > > > execution for specified operators.
> > > > > >
> > > > > > However the requirement is not supported currently. I think there
> > > > > > should be some
> > > > > > > place to describe how to support it.
> > > > > >
> > > > > >
> > > > > > In this FLIP design, the speculative execution of Committer of
> Sink
> > > V2
> > > > > will
> > > > > > be disabled by Flink. It's not an optional operation. Users can
> not
> > > > > change
> > > > > > it.
> > > > > > And as you said, "disable speculative execution for specified
> > > > operators"
> > > > > is
> > > > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > > > Supports
> > > > > > Speculative Execution For Batch Job". I think it's better to
> start
> > > > > another
> > > > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > > > execution
> > > > > > for operators" can be the title of that FLIP. And we can discuss
> > > there
> > > > > how
> > > > > > to enable or disable speculative execution for specified
> operators
> > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > speculative
> > > > > > > execution
> > > > > > > of sinks is important.
> > > > > > >
> > > > > > > I have a question about this: Speculative execution of
> Committer
> > > will
> > > > > be
> > > > > > > disabled.
> > > > > > >
> > > > > > > I agree with your point and I saw the similar requirements to
> > > disable
> > > > > > > speculative execution for specified operators.
> > > > > > >
> > > > > > > However the requirement is not supported currently. I think
> there
> > > > > should
> > > > > > be
> > > > > > > some place to describe how to support it.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jing Zhang
> > > > > > >
> > > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for driving this FLIP.
> > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > subtaskIndex)"
> > > > > > > for
> > > > > > > > OutputFormat to know which subtask attempt is the one marked
> as
> > > > > > finished
> > > > > > > by
> > > > > > > > JM and commit the right data.
> > > > > > > > I have a question about this: does SinkV2 need to do the same
> > > > thing?
> > > > > > > Should
> > > > > > > > we/users do it in the committer? If yes, how does the
> commiter
> > > know
> > > > > > which
> > > > > > > > one is the right subtask attempt?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Lijie
> > > > > > > >
> > > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > > > >
> > > > > > > > > HI, Biao.
> > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > After quick look of this FLIP, I have a question about
> > "expose
> > > > the
> > > > > > > > attempt
> > > > > > > > > number which can be used to isolate data produced by
> > > speculative
> > > > > > > > executions
> > > > > > > > > with the same subtask id".
> > > > > > > > > What the sink expect to do to isolate data produced by
> > > > speculative
> > > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> > new
> > > > > > > attempt.
> > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Yuxia
> > > > > > > > >
> > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution
> > For
> > > > > Batch
> > > > > > > Job
> > > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion about making Sink
> support
> > > > > > > speculative
> > > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > > "FLIP-168:
> > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > execution
> > > is
> > > > > > very
> > > > > > > > > meaningful for batch jobs. And it would be more complete
> > after
> > > > > > > supporting
> > > > > > > > > speculative execution of Sink. Please find more details in
> > the
> > > > FLIP
> > > > > > > > > document
> > > > > > > > > [2].
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback.
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Martijn Visser <ma...@apache.org>.
Hi Biao,

While I rather wouldn't add new features to (to-be) deprecated features, I
would be +0 for this.

Best regards,

Martijn

Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mm...@gmail.com>:

> Hi Martijn,
>
> Thanks for your feedback!
>
> Yes, we propose to support speculative execution for SinkFunction.
> 1. From the perspective of compatibility, SinkFunction is the most original
> Sink implementation.There are lots of implementations based on
> SinkFunction, not only in Flink official codebase but also in user's
> private codebase. It's a more serious issue than Sink V1. Of course we hope
> users could migrate the legacy implementation to the new interface. However
> migration is always hard.
> 2. From the perspective of cost, we don't need to do much extra work to
> support speculative execution for SinkFunction. All we need to do is check
> whether the SinkFunction implementation
> inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> work are the same with Sink V2.
>
> To summarize, it's cheap to support speculative execution for SinkFunction.
> And it may allow more existing scenarios to run with speculative execution.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 11 Jan 2023 at 21:22, Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > Apologies for the late jumping in. My only question is about
> SinkFunction,
> > does this imply that we want to add support for this to the SinkFunction?
> > If so, I would not be in favour of that since we would like to deprecate
> (I
> > actually thought that was already the case) the SinkFunction in favour of
> > SinkV2.
> >
> > Besides that, I have no other comments.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com> wrote:
> >
> > > Hi Biao,
> > >
> > > Thanks for explanation.
> > >
> > > +1 for the proposal.
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > attempt. I have no more questions, +1 for the proposal.
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > >
> > > > > Thanks for all your feedback!
> > > > >
> > > > > To @Yuxia,
> > > > >
> > > > > > What the sink expect to do to isolate data produced by
> speculative
> > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > attempt.
> > > > > > Does it make difference in isolating data produced?
> > > > >
> > > > >
> > > > > Yes there is something different from the task failover scenario.
> The
> > > > > attempt number is more necessary for speculative execution than
> > > failover.
> > > > > Because there can be only one subtask instance running at the same
> > time
> > > > in
> > > > > the failover scenario.
> > > > >
> > > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > > scenario,
> > > > > the temporary directory to store produced data can be something
> like
> > > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> > > > deletes
> > > > > and re-creates the temporary directory.
> > > > >
> > > > > However in the speculative execution scenario, it does not work
> > because
> > > > > there might be several subtasks running at the same time. These
> > > subtasks
> > > > > might delete, re-create and write the same temporary directory at
> the
> > > > > same time. The correct temporary directory should be like
> > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > necessary
> > > to
> > > > > expose the attempt number to the Sink implementation to do the data
> > > > > isolation.
> > > > >
> > > > >
> > > > > To @Lijie,
> > > > >
> > > > > > I have a question about this: does SinkV2 need to do the same
> > thing?
> > > > >
> > > > >
> > > > > Actually, yes.
> > > > >
> > > > > Should we/users do it in the committer? If yes, how does the
> commiter
> > > > know
> > > > > > which one is the right subtask attempt?
> > > > >
> > > > >
> > > > > Yes, we/users should do it in the committer.
> > > > >
> > > > > In the current design, the Committer of Sink V2 should get the
> "which
> > > one
> > > > > is the right subtask attempt" information from the "committable
> > data''
> > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > "committable data" sent to the Committer contains the full path of
> > the
> > > > > files produced by SinkWriter. Users could also pass the attempt
> > number
> > > > > through "committable data" from SinkWriter to Committer.
> > > > >
> > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked
> data
> > > of
> > > > > Sink V2" section of the FLIP document, we discussed some of the
> > reasons
> > > > > that we didn't provide the API like OutputFormat.
> > > > >
> > > > > To @Jing Zhang
> > > > >
> > > > > I have a question about this: Speculative execution of Committer
> will
> > > be
> > > > > > disabled.
> > > > >
> > > > > I agree with your point and I saw the similar requirements to
> disable
> > > > > speculative
> > > > > > execution for specified operators.
> > > > >
> > > > > However the requirement is not supported currently. I think there
> > > > > should be some
> > > > > > place to describe how to support it.
> > > > >
> > > > >
> > > > > In this FLIP design, the speculative execution of Committer of Sink
> > V2
> > > > will
> > > > > be disabled by Flink. It's not an optional operation. Users can not
> > > > change
> > > > > it.
> > > > > And as you said, "disable speculative execution for specified
> > > operators"
> > > > is
> > > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > > Supports
> > > > > Speculative Execution For Batch Job". I think it's better to start
> > > > another
> > > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > > execution
> > > > > for operators" can be the title of that FLIP. And we can discuss
> > there
> > > > how
> > > > > to enable or disable speculative execution for specified operators
> > > > > including Committer and pre/post-committer of Sink V2.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > > Biao /'bɪ.aʊ/
> > > > >
> > > > >
> > > > >
> > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for driving this FLIP. It's meaningful to support
> > speculative
> > > > > > execution
> > > > > > of sinks is important.
> > > > > >
> > > > > > I have a question about this: Speculative execution of Committer
> > will
> > > > be
> > > > > > disabled.
> > > > > >
> > > > > > I agree with your point and I saw the similar requirements to
> > disable
> > > > > > speculative execution for specified operators.
> > > > > >
> > > > > > However the requirement is not supported currently. I think there
> > > > should
> > > > > be
> > > > > > some place to describe how to support it.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for driving this FLIP.
> > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > subtaskIndex)"
> > > > > > for
> > > > > > > OutputFormat to know which subtask attempt is the one marked as
> > > > > finished
> > > > > > by
> > > > > > > JM and commit the right data.
> > > > > > > I have a question about this: does SinkV2 need to do the same
> > > thing?
> > > > > > Should
> > > > > > > we/users do it in the committer? If yes, how does the commiter
> > know
> > > > > which
> > > > > > > one is the right subtask attempt?
> > > > > > >
> > > > > > > Best,
> > > > > > > Lijie
> > > > > > >
> > > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > > >
> > > > > > > > HI, Biao.
> > > > > > > > Thanks for driving this FLIP.
> > > > > > > > After quick look of this FLIP, I have a question about
> "expose
> > > the
> > > > > > > attempt
> > > > > > > > number which can be used to isolate data produced by
> > speculative
> > > > > > > executions
> > > > > > > > with the same subtask id".
> > > > > > > > What the sink expect to do to isolate data produced by
> > > speculative
> > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> new
> > > > > > attempt.
> > > > > > > > Does it make difference in isolating data produced?
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Yuxia
> > > > > > > >
> > > > > > > > ----- 原始邮件 -----
> > > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution
> For
> > > > Batch
> > > > > > Job
> > > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I would like to start a discussion about making Sink support
> > > > > > speculative
> > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > "FLIP-168:
> > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> execution
> > is
> > > > > very
> > > > > > > > meaningful for batch jobs. And it would be more complete
> after
> > > > > > supporting
> > > > > > > > speculative execution of Sink. Please find more details in
> the
> > > FLIP
> > > > > > > > document
> > > > > > > > [2].
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > [2]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Biao Liu <mm...@gmail.com>.
Hi Martijn,

Thanks for your feedback!

Yes, we propose to support speculative execution for SinkFunction.
1. From the perspective of compatibility, SinkFunction is the most original
Sink implementation.There are lots of implementations based on
SinkFunction, not only in Flink official codebase but also in user's
private codebase. It's a more serious issue than Sink V1. Of course we hope
users could migrate the legacy implementation to the new interface. However
migration is always hard.
2. From the perspective of cost, we don't need to do much extra work to
support speculative execution for SinkFunction. All we need to do is check
whether the SinkFunction implementation
inherits SupportsConcurrentExecutionAttempts or not. The other parts of
work are the same with Sink V2.

To summarize, it's cheap to support speculative execution for SinkFunction.
And it may allow more existing scenarios to run with speculative execution.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 11 Jan 2023 at 21:22, Martijn Visser <ma...@apache.org>
wrote:

> Hi Biao,
>
> Apologies for the late jumping in. My only question is about SinkFunction,
> does this imply that we want to add support for this to the SinkFunction?
> If so, I would not be in favour of that since we would like to deprecate (I
> actually thought that was already the case) the SinkFunction in favour of
> SinkV2.
>
> Besides that, I have no other comments.
>
> Best regards,
>
> Martijn
>
> On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com> wrote:
>
> > Hi Biao,
> >
> > Thanks for explanation.
> >
> > +1 for the proposal.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
> >
> > > Hi Biao,
> > >
> > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > attempt. I have no more questions, +1 for the proposal.
> > >
> > > Best,
> > > Lijie
> > >
> > > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> > >
> > > > Thanks for all your feedback!
> > > >
> > > > To @Yuxia,
> > > >
> > > > > What the sink expect to do to isolate data produced by speculative
> > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > attempt.
> > > > > Does it make difference in isolating data produced?
> > > >
> > > >
> > > > Yes there is something different from the task failover scenario. The
> > > > attempt number is more necessary for speculative execution than
> > failover.
> > > > Because there can be only one subtask instance running at the same
> time
> > > in
> > > > the failover scenario.
> > > >
> > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > scenario,
> > > > the temporary directory to store produced data can be something like
> > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> > > deletes
> > > > and re-creates the temporary directory.
> > > >
> > > > However in the speculative execution scenario, it does not work
> because
> > > > there might be several subtasks running at the same time. These
> > subtasks
> > > > might delete, re-create and write the same temporary directory at the
> > > > same time. The correct temporary directory should be like
> > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> necessary
> > to
> > > > expose the attempt number to the Sink implementation to do the data
> > > > isolation.
> > > >
> > > >
> > > > To @Lijie,
> > > >
> > > > > I have a question about this: does SinkV2 need to do the same
> thing?
> > > >
> > > >
> > > > Actually, yes.
> > > >
> > > > Should we/users do it in the committer? If yes, how does the commiter
> > > know
> > > > > which one is the right subtask attempt?
> > > >
> > > >
> > > > Yes, we/users should do it in the committer.
> > > >
> > > > In the current design, the Committer of Sink V2 should get the "which
> > one
> > > > is the right subtask attempt" information from the "committable
> data''
> > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > "committable data" sent to the Committer contains the full path of
> the
> > > > files produced by SinkWriter. Users could also pass the attempt
> number
> > > > through "committable data" from SinkWriter to Committer.
> > > >
> > > > In the "Rejected Alternatives -> Introduce a way to clean leaked data
> > of
> > > > Sink V2" section of the FLIP document, we discussed some of the
> reasons
> > > > that we didn't provide the API like OutputFormat.
> > > >
> > > > To @Jing Zhang
> > > >
> > > > I have a question about this: Speculative execution of Committer will
> > be
> > > > > disabled.
> > > >
> > > > I agree with your point and I saw the similar requirements to disable
> > > > speculative
> > > > > execution for specified operators.
> > > >
> > > > However the requirement is not supported currently. I think there
> > > > should be some
> > > > > place to describe how to support it.
> > > >
> > > >
> > > > In this FLIP design, the speculative execution of Committer of Sink
> V2
> > > will
> > > > be disabled by Flink. It's not an optional operation. Users can not
> > > change
> > > > it.
> > > > And as you said, "disable speculative execution for specified
> > operators"
> > > is
> > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > Supports
> > > > Speculative Execution For Batch Job". I think it's better to start
> > > another
> > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > execution
> > > > for operators" can be the title of that FLIP. And we can discuss
> there
> > > how
> > > > to enable or disable speculative execution for specified operators
> > > > including Committer and pre/post-committer of Sink V2.
> > > >
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > > >
> > > >
> > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com>
> wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for driving this FLIP. It's meaningful to support
> speculative
> > > > > execution
> > > > > of sinks is important.
> > > > >
> > > > > I have a question about this: Speculative execution of Committer
> will
> > > be
> > > > > disabled.
> > > > >
> > > > > I agree with your point and I saw the similar requirements to
> disable
> > > > > speculative execution for specified operators.
> > > > >
> > > > > However the requirement is not supported currently. I think there
> > > should
> > > > be
> > > > > some place to describe how to support it.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for driving this FLIP.
> > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > subtaskIndex)"
> > > > > for
> > > > > > OutputFormat to know which subtask attempt is the one marked as
> > > > finished
> > > > > by
> > > > > > JM and commit the right data.
> > > > > > I have a question about this: does SinkV2 need to do the same
> > thing?
> > > > > Should
> > > > > > we/users do it in the committer? If yes, how does the commiter
> know
> > > > which
> > > > > > one is the right subtask attempt?
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > >
> > > > > > > HI, Biao.
> > > > > > > Thanks for driving this FLIP.
> > > > > > > After quick look of this FLIP, I have a question about "expose
> > the
> > > > > > attempt
> > > > > > > number which can be used to isolate data produced by
> speculative
> > > > > > executions
> > > > > > > with the same subtask id".
> > > > > > > What the sink expect to do to isolate data produced by
> > speculative
> > > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > > attempt.
> > > > > > > Does it make difference in isolating data produced?
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Yuxia
> > > > > > >
> > > > > > > ----- 原始邮件 -----
> > > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > > Batch
> > > > > Job
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > I would like to start a discussion about making Sink support
> > > > > speculative
> > > > > > > execution for batch jobs. This proposal is a follow up of
> > > "FLIP-168:
> > > > > > > Speculative Execution For Batch Job"[1]. Speculative execution
> is
> > > > very
> > > > > > > meaningful for batch jobs. And it would be more complete after
> > > > > supporting
> > > > > > > speculative execution of Sink. Please find more details in the
> > FLIP
> > > > > > > document
> > > > > > > [2].
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > [2]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Biao /'bɪ.aʊ/
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Martijn Visser <ma...@apache.org>.
Hi Biao,

Apologies for the late jumping in. My only question is about SinkFunction,
does this imply that we want to add support for this to the SinkFunction?
If so, I would not be in favour of that since we would like to deprecate (I
actually thought that was already the case) the SinkFunction in favour of
SinkV2.

Besides that, I have no other comments.

Best regards,

Martijn

On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <be...@gmail.com> wrote:

> Hi Biao,
>
> Thanks for explanation.
>
> +1 for the proposal.
>
> Best,
> Jing Zhang
>
> Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:
>
> > Hi Biao,
> >
> > Thanks for the explanation of how SinkV2  knows the right subtask
> > attempt. I have no more questions, +1 for the proposal.
> >
> > Best,
> > Lijie
> >
> > Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
> >
> > > Thanks for all your feedback!
> > >
> > > To @Yuxia,
> > >
> > > > What the sink expect to do to isolate data produced by speculative
> > > > executions?  IIUC, if the taks failover, it also generate a new
> > attempt.
> > > > Does it make difference in isolating data produced?
> > >
> > >
> > > Yes there is something different from the task failover scenario. The
> > > attempt number is more necessary for speculative execution than
> failover.
> > > Because there can be only one subtask instance running at the same time
> > in
> > > the failover scenario.
> > >
> > > Let's take FileSystemOutputFormat as an example. For the failover
> > scenario,
> > > the temporary directory to store produced data can be something like
> > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> > deletes
> > > and re-creates the temporary directory.
> > >
> > > However in the speculative execution scenario, it does not work because
> > > there might be several subtasks running at the same time. These
> subtasks
> > > might delete, re-create and write the same temporary directory at the
> > > same time. The correct temporary directory should be like
> > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary
> to
> > > expose the attempt number to the Sink implementation to do the data
> > > isolation.
> > >
> > >
> > > To @Lijie,
> > >
> > > > I have a question about this: does SinkV2 need to do the same thing?
> > >
> > >
> > > Actually, yes.
> > >
> > > Should we/users do it in the committer? If yes, how does the commiter
> > know
> > > > which one is the right subtask attempt?
> > >
> > >
> > > Yes, we/users should do it in the committer.
> > >
> > > In the current design, the Committer of Sink V2 should get the "which
> one
> > > is the right subtask attempt" information from the "committable data''
> > > produced by SinkWriter. Let's take the FileSink as example, the
> > > "committable data" sent to the Committer contains the full path of the
> > > files produced by SinkWriter. Users could also pass the attempt number
> > > through "committable data" from SinkWriter to Committer.
> > >
> > > In the "Rejected Alternatives -> Introduce a way to clean leaked data
> of
> > > Sink V2" section of the FLIP document, we discussed some of the reasons
> > > that we didn't provide the API like OutputFormat.
> > >
> > > To @Jing Zhang
> > >
> > > I have a question about this: Speculative execution of Committer will
> be
> > > > disabled.
> > >
> > > I agree with your point and I saw the similar requirements to disable
> > > speculative
> > > > execution for specified operators.
> > >
> > > However the requirement is not supported currently. I think there
> > > should be some
> > > > place to describe how to support it.
> > >
> > >
> > > In this FLIP design, the speculative execution of Committer of Sink V2
> > will
> > > be disabled by Flink. It's not an optional operation. Users can not
> > change
> > > it.
> > > And as you said, "disable speculative execution for specified
> operators"
> > is
> > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > Supports
> > > Speculative Execution For Batch Job". I think it's better to start
> > another
> > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > execution
> > > for operators" can be the title of that FLIP. And we can discuss there
> > how
> > > to enable or disable speculative execution for specified operators
> > > including Committer and pre/post-committer of Sink V2.
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com> wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for driving this FLIP. It's meaningful to support speculative
> > > > execution
> > > > of sinks is important.
> > > >
> > > > I have a question about this: Speculative execution of Committer will
> > be
> > > > disabled.
> > > >
> > > > I agree with your point and I saw the similar requirements to disable
> > > > speculative execution for specified operators.
> > > >
> > > > However the requirement is not supported currently. I think there
> > should
> > > be
> > > > some place to describe how to support it.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for driving this FLIP.
> > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > subtaskIndex)"
> > > > for
> > > > > OutputFormat to know which subtask attempt is the one marked as
> > > finished
> > > > by
> > > > > JM and commit the right data.
> > > > > I have a question about this: does SinkV2 need to do the same
> thing?
> > > > Should
> > > > > we/users do it in the committer? If yes, how does the commiter know
> > > which
> > > > > one is the right subtask attempt?
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > >
> > > > > > HI, Biao.
> > > > > > Thanks for driving this FLIP.
> > > > > > After quick look of this FLIP, I have a question about "expose
> the
> > > > > attempt
> > > > > > number which can be used to isolate data produced by speculative
> > > > > executions
> > > > > > with the same subtask id".
> > > > > > What the sink expect to do to isolate data produced by
> speculative
> > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > attempt.
> > > > > > Does it make difference in isolating data produced?
> > > > > >
> > > > > > Best regards,
> > > > > > Yuxia
> > > > > >
> > > > > > ----- 原始邮件 -----
> > > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > Batch
> > > > Job
> > > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I would like to start a discussion about making Sink support
> > > > speculative
> > > > > > execution for batch jobs. This proposal is a follow up of
> > "FLIP-168:
> > > > > > Speculative Execution For Batch Job"[1]. Speculative execution is
> > > very
> > > > > > meaningful for batch jobs. And it would be more complete after
> > > > supporting
> > > > > > speculative execution of Sink. Please find more details in the
> FLIP
> > > > > > document
> > > > > > [2].
> > > > > >
> > > > > > Looking forward to your feedback.
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Jing Zhang <be...@gmail.com>.
Hi Biao,

Thanks for explanation.

+1 for the proposal.

Best,
Jing Zhang

Lijie Wang <wa...@gmail.com> 于2023年1月4日周三 12:11写道:

> Hi Biao,
>
> Thanks for the explanation of how SinkV2  knows the right subtask
> attempt. I have no more questions, +1 for the proposal.
>
> Best,
> Lijie
>
> Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:
>
> > Thanks for all your feedback!
> >
> > To @Yuxia,
> >
> > > What the sink expect to do to isolate data produced by speculative
> > > executions?  IIUC, if the taks failover, it also generate a new
> attempt.
> > > Does it make difference in isolating data produced?
> >
> >
> > Yes there is something different from the task failover scenario. The
> > attempt number is more necessary for speculative execution than failover.
> > Because there can be only one subtask instance running at the same time
> in
> > the failover scenario.
> >
> > Let's take FileSystemOutputFormat as an example. For the failover
> scenario,
> > the temporary directory to store produced data can be something like
> > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> deletes
> > and re-creates the temporary directory.
> >
> > However in the speculative execution scenario, it does not work because
> > there might be several subtasks running at the same time. These subtasks
> > might delete, re-create and write the same temporary directory at the
> > same time. The correct temporary directory should be like
> > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
> > expose the attempt number to the Sink implementation to do the data
> > isolation.
> >
> >
> > To @Lijie,
> >
> > > I have a question about this: does SinkV2 need to do the same thing?
> >
> >
> > Actually, yes.
> >
> > Should we/users do it in the committer? If yes, how does the commiter
> know
> > > which one is the right subtask attempt?
> >
> >
> > Yes, we/users should do it in the committer.
> >
> > In the current design, the Committer of Sink V2 should get the "which one
> > is the right subtask attempt" information from the "committable data''
> > produced by SinkWriter. Let's take the FileSink as example, the
> > "committable data" sent to the Committer contains the full path of the
> > files produced by SinkWriter. Users could also pass the attempt number
> > through "committable data" from SinkWriter to Committer.
> >
> > In the "Rejected Alternatives -> Introduce a way to clean leaked data of
> > Sink V2" section of the FLIP document, we discussed some of the reasons
> > that we didn't provide the API like OutputFormat.
> >
> > To @Jing Zhang
> >
> > I have a question about this: Speculative execution of Committer will be
> > > disabled.
> >
> > I agree with your point and I saw the similar requirements to disable
> > speculative
> > > execution for specified operators.
> >
> > However the requirement is not supported currently. I think there
> > should be some
> > > place to describe how to support it.
> >
> >
> > In this FLIP design, the speculative execution of Committer of Sink V2
> will
> > be disabled by Flink. It's not an optional operation. Users can not
> change
> > it.
> > And as you said, "disable speculative execution for specified operators"
> is
> > not supported in the FLIP. Because it's a bit out of scope: "Sink
> Supports
> > Speculative Execution For Batch Job". I think it's better to start
> another
> > FLIP to discuss it. "Fine-grained control of enabling speculative
> execution
> > for operators" can be the title of that FLIP. And we can discuss there
> how
> > to enable or disable speculative execution for specified operators
> > including Committer and pre/post-committer of Sink V2.
> >
> > What do you think?
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com> wrote:
> >
> > > Hi Biao,
> > >
> > > Thanks for driving this FLIP. It's meaningful to support speculative
> > > execution
> > > of sinks is important.
> > >
> > > I have a question about this: Speculative execution of Committer will
> be
> > > disabled.
> > >
> > > I agree with your point and I saw the similar requirements to disable
> > > speculative execution for specified operators.
> > >
> > > However the requirement is not supported currently. I think there
> should
> > be
> > > some place to describe how to support it.
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for driving this FLIP.
> > > > In this FLIP, it introduces "int getFinishedAttempt(int
> subtaskIndex)"
> > > for
> > > > OutputFormat to know which subtask attempt is the one marked as
> > finished
> > > by
> > > > JM and commit the right data.
> > > > I have a question about this: does SinkV2 need to do the same thing?
> > > Should
> > > > we/users do it in the committer? If yes, how does the commiter know
> > which
> > > > one is the right subtask attempt?
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > >
> > > > > HI, Biao.
> > > > > Thanks for driving this FLIP.
> > > > > After quick look of this FLIP, I have a question about "expose the
> > > > attempt
> > > > > number which can be used to isolate data produced by speculative
> > > > executions
> > > > > with the same subtask id".
> > > > > What the sink expect to do to isolate data produced by speculative
> > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > attempt.
> > > > > Does it make difference in isolating data produced?
> > > > >
> > > > > Best regards,
> > > > > Yuxia
> > > > >
> > > > > ----- 原始邮件 -----
> > > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > > 收件人: "dev" <de...@flink.apache.org>
> > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> Batch
> > > Job
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > I would like to start a discussion about making Sink support
> > > speculative
> > > > > execution for batch jobs. This proposal is a follow up of
> "FLIP-168:
> > > > > Speculative Execution For Batch Job"[1]. Speculative execution is
> > very
> > > > > meaningful for batch jobs. And it would be more complete after
> > > supporting
> > > > > speculative execution of Sink. Please find more details in the FLIP
> > > > > document
> > > > > [2].
> > > > >
> > > > > Looking forward to your feedback.
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > >
> > > > > Thanks,
> > > > > Biao /'bɪ.aʊ/
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Lijie Wang <wa...@gmail.com>.
Hi Biao,

Thanks for the explanation of how SinkV2  knows the right subtask
attempt. I have no more questions, +1 for the proposal.

Best,
Lijie

Biao Liu <mm...@gmail.com> 于2022年12月28日周三 17:22写道:

> Thanks for all your feedback!
>
> To @Yuxia,
>
> > What the sink expect to do to isolate data produced by speculative
> > executions?  IIUC, if the taks failover, it also generate a new attempt.
> > Does it make difference in isolating data produced?
>
>
> Yes there is something different from the task failover scenario. The
> attempt number is more necessary for speculative execution than failover.
> Because there can be only one subtask instance running at the same time in
> the failover scenario.
>
> Let's take FileSystemOutputFormat as an example. For the failover scenario,
> the temporary directory to store produced data can be something like
> "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
> and re-creates the temporary directory.
>
> However in the speculative execution scenario, it does not work because
> there might be several subtasks running at the same time. These subtasks
> might delete, re-create and write the same temporary directory at the
> same time. The correct temporary directory should be like
> "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
> expose the attempt number to the Sink implementation to do the data
> isolation.
>
>
> To @Lijie,
>
> > I have a question about this: does SinkV2 need to do the same thing?
>
>
> Actually, yes.
>
> Should we/users do it in the committer? If yes, how does the commiter know
> > which one is the right subtask attempt?
>
>
> Yes, we/users should do it in the committer.
>
> In the current design, the Committer of Sink V2 should get the "which one
> is the right subtask attempt" information from the "committable data''
> produced by SinkWriter. Let's take the FileSink as example, the
> "committable data" sent to the Committer contains the full path of the
> files produced by SinkWriter. Users could also pass the attempt number
> through "committable data" from SinkWriter to Committer.
>
> In the "Rejected Alternatives -> Introduce a way to clean leaked data of
> Sink V2" section of the FLIP document, we discussed some of the reasons
> that we didn't provide the API like OutputFormat.
>
> To @Jing Zhang
>
> I have a question about this: Speculative execution of Committer will be
> > disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative
> > execution for specified operators.
>
> However the requirement is not supported currently. I think there
> should be some
> > place to describe how to support it.
>
>
> In this FLIP design, the speculative execution of Committer of Sink V2 will
> be disabled by Flink. It's not an optional operation. Users can not change
> it.
> And as you said, "disable speculative execution for specified operators" is
> not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
> Speculative Execution For Batch Job". I think it's better to start another
> FLIP to discuss it. "Fine-grained control of enabling speculative execution
> for operators" can be the title of that FLIP. And we can discuss there how
> to enable or disable speculative execution for specified operators
> including Committer and pre/post-committer of Sink V2.
>
> What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com> wrote:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP. It's meaningful to support speculative
> > execution
> > of sinks is important.
> >
> > I have a question about this: Speculative execution of Committer will be
> > disabled.
> >
> > I agree with your point and I saw the similar requirements to disable
> > speculative execution for specified operators.
> >
> > However the requirement is not supported currently. I think there should
> be
> > some place to describe how to support it.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
> >
> > > Hi Biao,
> > >
> > > Thanks for driving this FLIP.
> > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> > for
> > > OutputFormat to know which subtask attempt is the one marked as
> finished
> > by
> > > JM and commit the right data.
> > > I have a question about this: does SinkV2 need to do the same thing?
> > Should
> > > we/users do it in the committer? If yes, how does the commiter know
> which
> > > one is the right subtask attempt?
> > >
> > > Best,
> > > Lijie
> > >
> > > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > >
> > > > HI, Biao.
> > > > Thanks for driving this FLIP.
> > > > After quick look of this FLIP, I have a question about "expose the
> > > attempt
> > > > number which can be used to isolate data produced by speculative
> > > executions
> > > > with the same subtask id".
> > > > What the sink expect to do to isolate data produced by speculative
> > > > executions?  IIUC, if the taks failover, it also generate a new
> > attempt.
> > > > Does it make difference in isolating data produced?
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > > > ----- 原始邮件 -----
> > > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > > 收件人: "dev" <de...@flink.apache.org>
> > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch
> > Job
> > > >
> > > > Hi everyone,
> > > >
> > > > I would like to start a discussion about making Sink support
> > speculative
> > > > execution for batch jobs. This proposal is a follow up of "FLIP-168:
> > > > Speculative Execution For Batch Job"[1]. Speculative execution is
> very
> > > > meaningful for batch jobs. And it would be more complete after
> > supporting
> > > > speculative execution of Sink. Please find more details in the FLIP
> > > > document
> > > > [2].
> > > >
> > > > Looking forward to your feedback.
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Biao Liu <mm...@gmail.com>.
Thanks for all your feedback!

To @Yuxia,

> What the sink expect to do to isolate data produced by speculative
> executions?  IIUC, if the taks failover, it also generate a new attempt.
> Does it make difference in isolating data produced?


Yes there is something different from the task failover scenario. The
attempt number is more necessary for speculative execution than failover.
Because there can be only one subtask instance running at the same time in
the failover scenario.

Let's take FileSystemOutputFormat as an example. For the failover scenario,
the temporary directory to store produced data can be something like
"$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
and re-creates the temporary directory.

However in the speculative execution scenario, it does not work because
there might be several subtasks running at the same time. These subtasks
might delete, re-create and write the same temporary directory at the
same time. The correct temporary directory should be like
"$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
expose the attempt number to the Sink implementation to do the data
isolation.


To @Lijie,

> I have a question about this: does SinkV2 need to do the same thing?


Actually, yes.

Should we/users do it in the committer? If yes, how does the commiter know
> which one is the right subtask attempt?


Yes, we/users should do it in the committer.

In the current design, the Committer of Sink V2 should get the "which one
is the right subtask attempt" information from the "committable data''
produced by SinkWriter. Let's take the FileSink as example, the
"committable data" sent to the Committer contains the full path of the
files produced by SinkWriter. Users could also pass the attempt number
through "committable data" from SinkWriter to Committer.

In the "Rejected Alternatives -> Introduce a way to clean leaked data of
Sink V2" section of the FLIP document, we discussed some of the reasons
that we didn't provide the API like OutputFormat.

To @Jing Zhang

I have a question about this: Speculative execution of Committer will be
> disabled.

I agree with your point and I saw the similar requirements to disable
speculative
> execution for specified operators.

However the requirement is not supported currently. I think there
should be some
> place to describe how to support it.


In this FLIP design, the speculative execution of Committer of Sink V2 will
be disabled by Flink. It's not an optional operation. Users can not change
it.
And as you said, "disable speculative execution for specified operators" is
not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
Speculative Execution For Batch Job". I think it's better to start another
FLIP to discuss it. "Fine-grained control of enabling speculative execution
for operators" can be the title of that FLIP. And we can discuss there how
to enable or disable speculative execution for specified operators
including Committer and pre/post-committer of Sink V2.

What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Wed, 28 Dec 2022 at 11:30, Jing Zhang <be...@gmail.com> wrote:

> Hi Biao,
>
> Thanks for driving this FLIP. It's meaningful to support speculative
> execution
> of sinks is important.
>
> I have a question about this: Speculative execution of Committer will be
> disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative execution for specified operators.
>
> However the requirement is not supported currently. I think there should be
> some place to describe how to support it.
>
> Best,
> Jing Zhang
>
> Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP.
> > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> for
> > OutputFormat to know which subtask attempt is the one marked as finished
> by
> > JM and commit the right data.
> > I have a question about this: does SinkV2 need to do the same thing?
> Should
> > we/users do it in the committer? If yes, how does the commiter know which
> > one is the right subtask attempt?
> >
> > Best,
> > Lijie
> >
> > yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> >
> > > HI, Biao.
> > > Thanks for driving this FLIP.
> > > After quick look of this FLIP, I have a question about "expose the
> > attempt
> > > number which can be used to isolate data produced by speculative
> > executions
> > > with the same subtask id".
> > > What the sink expect to do to isolate data produced by speculative
> > > executions?  IIUC, if the taks failover, it also generate a new
> attempt.
> > > Does it make difference in isolating data produced?
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > ----- 原始邮件 -----
> > > 发件人: "Biao Liu" <mm...@gmail.com>
> > > 收件人: "dev" <de...@flink.apache.org>
> > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch
> Job
> > >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about making Sink support
> speculative
> > > execution for batch jobs. This proposal is a follow up of "FLIP-168:
> > > Speculative Execution For Batch Job"[1]. Speculative execution is very
> > > meaningful for batch jobs. And it would be more complete after
> supporting
> > > speculative execution of Sink. Please find more details in the FLIP
> > > document
> > > [2].
> > >
> > > Looking forward to your feedback.
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Jing Zhang <be...@gmail.com>.
Hi Biao,

Thanks for driving this FLIP. It's meaningful to support speculative execution
of sinks is important.

I have a question about this: Speculative execution of Committer will be
disabled.

I agree with your point and I saw the similar requirements to disable
speculative execution for specified operators.

However the requirement is not supported currently. I think there should be
some place to describe how to support it.

Best,
Jing Zhang

Lijie Wang <wa...@gmail.com> 于2022年12月27日周二 18:51写道:

> Hi Biao,
>
> Thanks for driving this FLIP.
> In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" for
> OutputFormat to know which subtask attempt is the one marked as finished by
> JM and commit the right data.
> I have a question about this: does SinkV2 need to do the same thing? Should
> we/users do it in the committer? If yes, how does the commiter know which
> one is the right subtask attempt?
>
> Best,
> Lijie
>
> yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
>
> > HI, Biao.
> > Thanks for driving this FLIP.
> > After quick look of this FLIP, I have a question about "expose the
> attempt
> > number which can be used to isolate data produced by speculative
> executions
> > with the same subtask id".
> > What the sink expect to do to isolate data produced by speculative
> > executions?  IIUC, if the taks failover, it also generate a new attempt.
> > Does it make difference in isolating data produced?
> >
> > Best regards,
> > Yuxia
> >
> > ----- 原始邮件 -----
> > 发件人: "Biao Liu" <mm...@gmail.com>
> > 收件人: "dev" <de...@flink.apache.org>
> > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
> >
> > Hi everyone,
> >
> > I would like to start a discussion about making Sink support speculative
> > execution for batch jobs. This proposal is a follow up of "FLIP-168:
> > Speculative Execution For Batch Job"[1]. Speculative execution is very
> > meaningful for batch jobs. And it would be more complete after supporting
> > speculative execution of Sink. Please find more details in the FLIP
> > document
> > [2].
> >
> > Looking forward to your feedback.
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Lijie Wang <wa...@gmail.com>.
Hi Biao,

Thanks for driving this FLIP.
In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" for
OutputFormat to know which subtask attempt is the one marked as finished by
JM and commit the right data.
I have a question about this: does SinkV2 need to do the same thing? Should
we/users do it in the committer? If yes, how does the commiter know which
one is the right subtask attempt?

Best,
Lijie

yuxia <lu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:

> HI, Biao.
> Thanks for driving this FLIP.
> After quick look of this FLIP, I have a question about "expose the attempt
> number which can be used to isolate data produced by speculative executions
> with the same subtask id".
> What the sink expect to do to isolate data produced by speculative
> executions?  IIUC, if the taks failover, it also generate a new attempt.
> Does it make difference in isolating data produced?
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Biao Liu" <mm...@gmail.com>
> 收件人: "dev" <de...@flink.apache.org>
> 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
>
> Hi everyone,
>
> I would like to start a discussion about making Sink support speculative
> execution for batch jobs. This proposal is a follow up of "FLIP-168:
> Speculative Execution For Batch Job"[1]. Speculative execution is very
> meaningful for batch jobs. And it would be more complete after supporting
> speculative execution of Sink. Please find more details in the FLIP
> document
> [2].
>
> Looking forward to your feedback.
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
>
> Thanks,
> Biao /'bɪ.aʊ/
>

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
HI, Biao.
Thanks for driving this FLIP.
After quick look of this FLIP, I have a question about "expose the attempt number which can be used to isolate data produced by speculative executions with the same subtask id".
What the sink expect to do to isolate data produced by speculative executions?  IIUC, if the taks failover, it also generate a new attempt. Does it make difference in isolating data produced?

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Biao Liu" <mm...@gmail.com>
收件人: "dev" <de...@flink.apache.org>
发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Hi everyone,

I would like to start a discussion about making Sink support speculative
execution for batch jobs. This proposal is a follow up of "FLIP-168:
Speculative Execution For Batch Job"[1]. Speculative execution is very
meaningful for batch jobs. And it would be more complete after supporting
speculative execution of Sink. Please find more details in the FLIP document
[2].

Looking forward to your feedback.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job

Thanks,
Biao /'bɪ.aʊ/

Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

Posted by Zhu Zhu <re...@gmail.com>.
Hi Biao,

Thanks for creating this FLIP!
Supporting speculative execution of sinks is important. Also In production
we see sinks chaining with other operators, e.g. sources in simple ETL
jobs, and currently the task cannot do speculative execution due to sinks
are not supported.

+1 for the proposal.

Thanks,
Zhu

Biao Liu <mm...@gmail.com> 于2022年12月22日周四 20:16写道:
>
> Hi everyone,
>
> I would like to start a discussion about making Sink support speculative
> execution for batch jobs. This proposal is a follow up of "FLIP-168:
> Speculative Execution For Batch Job"[1]. Speculative execution is very
> meaningful for batch jobs. And it would be more complete after supporting
> speculative execution of Sink. Please find more details in the FLIP document
> [2].
>
> Looking forward to your feedback.
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
>
> Thanks,
> Biao /'bɪ.aʊ/