You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kyuubi.apache.org by Paul Lam <pa...@gmail.com> on 2022/03/18 09:06:08 UTC

[DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Hi team,

As we aimed to make Flink engine production-ready [1], Flink savepoint/checkpoint management 
is a currently missing but crucial part, which we should prioritize. Therefore, I start this thread to
discus the implementation of Flink savepoint/checkpoint management.

There’re mainly three questions we need to think about:
1. how to trigger a savepoint?
2. how to find the available savepoints/checkpoints for a job?
3. how to specify a savepoint/checkpoint for restore?

# 1. how to trigger a savepoint
Apart from the automatic checkpoint, Flink allows user to manually trigger a savepoint, 
either during job running period or on stopping. To support that, there’re two prerequisites.

1) support savepoint/cancel operations
Savepoint operation is not SQL, thus can’t be passed to Flink engine like a normal statement,
thus we may need a new operation type. 

Cancel query operation is supported currently, but it’s not exposed to beeline in a production-
ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but this doesn’t 
work for async queries, which is very common in streaming scenarios. Thus we may
need to extend beeline to support a cancel command. 

To sum up, I think we might need to add a `savepoint` operation in Kyuubi Server, and expose
savepoint and cancel operations in Beeline (maybe JDBC as well if possible).

2) expose Flink Job ID
To track an async query, we need an ID. It could be the operation ID of Kyuubi or job ID of Flink
(maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata, we may lose track the
Flink jobs after a restart. So I propose to expose Flink Job IDs to users, to let users bookkeep
the IDs manually, and supports built-in Job management after Kyuubi has metadata persistence.

The users’ workflow should be like:
1. execute a Flink query which creates a Flink job and returns the Job ID
2. trigger a savepoint using the Job ID which returns a savepoint path
3. cancel the query using the Job ID (or just cancel-with-savepoint)

# 2. how to find available savepoints/checkpoints for a job
In some cases, Flink job crushes and we need to find an available savepoint or checkpoint for 
job restoring. This can be done via Flink history server or searching the checkpoint/savepoint 
directories. 

I think it’s good to support automatically searching for the available savepoint/checkpoints, but 
at the early phase, it’s okay to let users to do it manually. It doesn’t block Flink engine to be
production-ready.

# 3. how to specify a savepoint/checkpoint for restore
This question is relatively simple: add a new configuration option for users to set savepoint/
checkpoint path for restore before we provide the automatic search, and optionally automatically 
set the path after that .

1. https://github.com/apache/incubator-kyuubi/issues/2100 <https://github.com/apache/incubator-kyuubi/issues/2100>
2. https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery

Best,
Paul Lam


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Paul Lam <pa...@gmail.com>.
Hi team,

Sorry for the late follow-up. It took me some time to do some research.

TL;DR  It’s good to express savepoint in SQL statements. We should join efforts 
withFlink community to discuss SQL syntax for savepoint statements.There’re
mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And 
the rests are implementation details, such as how to return the query ID.

We had an offline discussion on DingTalk last week, and I believe we’ve reached 
a consensus on some issues.

As pointed out in the previous mails, we should consider
1. how to trigger a savepoint?
2. how to find the available savepoints/checkpoints for a job?
3. how to specify a savepoint/checkpoint for restore?

However, 3 is already supported by Flink SQL client, leaving 2 questions. As we 
discussed previous, the most straightforward solution is to extend Flink’s SQL 
parser to support savepointcommand. In such way, we treat savepoint
command as a normal SQL statement. So we could split the topic into SQL 
syntax and implementation.

WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align 
these efforts with Flink community. So I think we should draft a proposal and 
start a discussion at Flink community to determine a solution , then we could
implement it in Kyuubi first and push back to Flink (I’m planning to start a 
discussion in Flink community this week).

We have two solutions (thanks to Cheng):

1) ANSI SQL

   `CALL trigger_savepoint($query_id)`
   `CALL show_savepoint($query_id)`

pros: 
- no syntax conflict
- respect ANSI SQL

cons:
- CALL is not used in Flink SQL yet
- not sure if it’s viable to return savepoint paths, because stored procedures 
  should return rows count in normal cases

2)  Custom command

  `TRIGGER SAVEPOINT $query_id`
  `SHOW SAVEPOINT $query_id`

pros:
- simple syntax, easy to understand

cons:
- need to introduce new reserved keywords TRIGGER/SAVEPOINT 
- not ANSI-SQL compatible


WRT implementations, first we need a query ID, namely Flink job ID,
which we could acquire through TableResult with a few adjustments 
to ExecuteStatement in Flink Engine. 

There 2 approach to return the query ID to the clients. 

1) TGetQueryIdReq/Resp 
The clients need to request the query ID when a query is finished. 
Given that the origin semantic for the Req is to return all query IDs in the session[1], 
we may needed change it “the ID of the latest query”, or else it would be difficult 
for users to figure out which ID is the right one.

2) Return it in the result set 
This approach is straightforward. Flink returns a -1 as the affected rows, 
which is not very useful. We can simply replace that with the query ID.

Please tell me what do you think. Thanks a lot!

[1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761 <https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761>

Best,
Paul Lam

> 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
> 
> Hi Paul,
> 
> Big +1 for the proposal.
> 
> You can summarize all of this into a design document. And drive this feature!
> 
> Best,
> Vino
> 
> Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
>> 
>> Hi Kent,
>> 
>> Thanks for your pointer!
>> 
>> TGetQueryIdReq/Resp looks very promising.
>> 
>> Best,
>> Paul Lam
>> 
>>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
>>> 
>>> 
>> 


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Paul Lam <pa...@gmail.com>.
Agreed! Thanks, Cheng!

Best,
Paul Lam

> 2022年3月30日 23:55,Cheng Pan <pa...@gmail.com> 写道:
> 
>> Personally speaking, I think 3) is most likely to be adopted by Flink community, since TRIGGER/SAVEPOINT are already reserved keywords in Flink SQL[1]. Should we propose only 3) to Flink community?
> 
> I respect your opinion and experience in Flink community, and if there
> are other voices in the Flink community, we can offer other options.
> 
> I think the Kyuubi community can get a lot of benefits by following
> the upstream-first philosophy.
> 
> Thanks,
> Cheng Pan
> 
> On Wed, Mar 30, 2022 at 6:38 PM Paul Lam <pa...@gmail.com> wrote:
>> 
>> Hi Cheng,
>> 
>> Thanks a lot for your input! IMHO, now the whole design gets pretty clear.
>> 
>> I try to re-summarize the following plan, please collect me if I’m wrong.
>> 
>> 1. Discuss savepoint SQL syntax with Flink community
>> 
>> There 3 types of syntax:
>> 
>> 1) ANSI SQL style with extended object
>> 
>>> SHOW SAVEPOINTS <query_id>
>>> CREATE SAVEPOINT <query_id>
>>> DROP SAVEPOINT <savepoint_id>
>> 
>> 
>> 2) ANSI SQL style with stored procedure
>> 
>> CALL trigger_savepoint(…)
>> CALL show_savepoints(…)
>> CALL drop_savepoint(...)
>> 
>> 3) Command-like style
>> 
>> TRIGGER SAVEPOINT <query_id>
>> SHOW SAVEPOINTS <query_id>
>> REMOVE SAVEPOINT <savepoint_path>
>> 
>> If we reach agreement with Flink community on one of them, we adopt
>> the syntax. There’re might be some duplicate efforts in a short time,
>> but finally we will converge, and reuse most if not all functionalities
>> that Flink provides.
>> 
>> Personally speaking, I think 3) is most likely to be adopted by Flink
>> community, since TRIGGER/SAVEPOINT are already reserved
>> keywords in Flink SQL[1]. Should we propose only 3) to Flink
>> community?
>> 
>> 2. Draft a KIP about savepoint management
>> 
>> TODO list as far we can see:
>> 1) Support retrieving query ID in Flink engine
>> 2) Introduce a SQL layer to support new SQL syntax if needed (compatible
>>    with Flink SQL)
>> 3) Support savepoint related operations in Flink engine
>> 4) Extend Beeline to support query ID
>> 
>> Best,
>> Paul Lam
>> 
>>> 2022年3月30日 16:01,Cheng Pan <pa...@gmail.com> 写道:
>>> 
>>> Thanks Paul, I agree that we’ve reached a consensus on high-level, 1)
>>> use SQL to manipulate the savepoint, 2) follow upstreaming-first
>>> philosophy in SQL syntax and RPC protocol to achieve the best
>>> compatibility and user experience.
>>> 
>>> Specifically for details, add some comments.
>>> 
>>>> 1) ANSI SQL
>>>> `CALL trigger_savepoint($query_id)`
>>>> `CALL show_savepoint($query_id)`
>>> 
>>> We could give more flexibility to the concept of ANSI SQL-like.
>>> 
>>> For instance, we have
>>> 
>>> SHOW TABLES [LIKE ...]
>>> ALTER TABLE <table_name> SET xxx
>>> ALTER TABLE ADD ...
>>> DROP TABLE <table_name>
>>> SELECT xxx FROM <table_name>
>>> DESC <table_name>
>>> 
>>> We can extend SQL in same style for savepoints, e.g.
>>> 
>>> SHOW SAVEPOINTS <query_id>
>>> CREATE SAVEPOINT <query_id>
>>> DROP SAVEPOINT <query_id>
>>> SELECT ... FROM <system.savepoint_table_name> WHERE ...
>>> DESC <query_id>
>>> 
>>> One example is DistSQL[1]
>>> 
>>> The command style is specific to introduce new SQL action keywords, e.g.
>>> 
>>> OPTIMIZE <table_name>, VACUUM <table_name>, KILL <query_id>, KILL
>>> QUERY <query_id>
>>> 
>>> Usually, different engines/databases may have different syntax for the
>>> same behavior or different behavior in the same syntax. Unless the
>>> syntax has been adopted by the upstream, I prefer to use
>>> 
>>> CALL <procedure_name>(arg1, arg2, ...)
>>> 
>>> to avoid conflicting, and switch to the official syntax once the
>>> upstream introduces the new syntax.
>>> 
>>>> There 2 approach to return the query ID to the clients.
>>>> 
>>>> 1) TGetQueryIdReq/Resp
>>>> The clients need to request the query ID when a query is finished.
>>>> Given that the origin semantic for the Req is to return all query IDs in the session[1],
>>>> we may needed change it “the ID of the latest query”, or else it would be difficult
>>>> for users to figure out which ID is the right one.
>>>> 
>>>> 2) Return it in the result set
>>>> This approach is straightforward. Flink returns a -1 as the affected rows,
>>>> which is not very useful. We can simply replace that with the query ID.
>>> 
>>> Have a look on the TGetQueryIdReq/Resp, I think we can simplify the procedure to
>>> 
>>> 1. client sends an ExecuteQueryReq
>>> 2. server returns an OpHandle to client immediately
>>> 3. client sends TGetQueryIdReq(OpHandle) to ask for QueryId
>>> periodically until a legal result.
>>> 4. server returns the corresponding TGetQueryIdResp(QueryId) is
>>> available, otherwise returns a predefined QueryId constant e.g.
>>> 'UNDEFINED_QUERY_ID' if the statement does not accepted by the engine
>>> (there is no queryId for the stmt now)
>>> 
>>> [1] https://github.com/apache/shardingsphere/releases/tag/5.1.0
>>> 
>>> Thanks,
>>> Cheng Pan
>>> 
>>> On Tue, Mar 29, 2022 at 7:13 PM 林小铂 <li...@163.com> wrote:
>>>> 
>>>> Hi team,
>>>> 
>>>> Sorry for the late follow-up. It took me some time to do some research.
>>>> 
>>>> TL;DR  It’s good to express savepoint in SQL statements. We should join efforts
>>>> withFlink community to discuss SQL syntax for savepoint statements.There’re
>>>> mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And
>>>> the rests are implementation details, such as how to return the query ID.
>>>> 
>>>> We had an offline discussion on DingTalk last week, and I believe we’ve reached
>>>> a consensus on some issues.
>>>> 
>>>> As pointed out in the previous mails, we should consider
>>>> 1. how to trigger a savepoint?
>>>> 2. how to find the available savepoints/checkpoints for a job?
>>>> 3. how to specify a savepoint/checkpoint for restore?
>>>> 
>>>> However, 3 is already supported by Flink SQL client, leaving 2 questions. As we
>>>> discussed previous, the most straightforward solution is to extend Flink’s SQL
>>>> parser to support savepointcommand. In such way, we treat savepoint
>>>> command as a normal SQL statement. So we could split the topic into SQL
>>>> syntax and implementation.
>>>> 
>>>> WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align
>>>> these efforts with Flink community. So I think we should draft a proposal and
>>>> start a discussion at Flink community to determine a solution , then we could
>>>> implement it in Kyuubi first and push back to Flink (I’m planning to start a
>>>> discussion in Flink community this week).
>>>> 
>>>> We have two solutions (thanks to Cheng):
>>>> 
>>>> 1) ANSI SQL
>>>> 
>>>>  `CALL trigger_savepoint($query_id)`
>>>>  `CALL show_savepoint($query_id)`
>>>> 
>>>> pros:
>>>> - no syntax conflict
>>>> - respect ANSI SQL
>>>> 
>>>> cons:
>>>> - CALL is not used in Flink SQL yet
>>>> - not sure if it’s viable to return savepoint paths, because stored procedures
>>>> should return rows count in normal cases
>>>> 
>>>> 2)  Custom command
>>>> 
>>>> `TRIGGER SAVEPOINT $query_id`
>>>> `SHOW SAVEPOINT $query_id`
>>>> 
>>>> pros:
>>>> - simple syntax, easy to understand
>>>> 
>>>> cons:
>>>> - need to introduce new reserved keywords TRIGGER/SAVEPOINT
>>>> - not ANSI-SQL compatible
>>>> 
>>>> 
>>>> WRT implementations, first we need a query ID, namely Flink job ID,
>>>> which we could acquire through TableResult with a few adjustments
>>>> to ExecuteStatement in Flink Engine.
>>>> 
>>>> There 2 approach to return the query ID to the clients.
>>>> 
>>>> 1) TGetQueryIdReq/Resp
>>>> The clients need to request the query ID when a query is finished.
>>>> Given that the origin semantic for the Req is to return all query IDs in the session[1],
>>>> we may needed change it “the ID of the latest query”, or else it would be difficult
>>>> for users to figure out which ID is the right one.
>>>> 
>>>> 2) Return it in the result set
>>>> This approach is straightforward. Flink returns a -1 as the affected rows,
>>>> which is not very useful. We can simply replace that with the query ID.
>>>> 
>>>> Please tell me what do you think. Thanks a lot!
>>>> 
>>>> [1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761
>>>> 
>>>> 
>>>>> 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
>>>>> 
>>>>> Hi Paul,
>>>>> 
>>>>> Big +1 for the proposal.
>>>>> 
>>>>> You can summarize all of this into a design document. And drive this feature!
>>>>> 
>>>>> Best,
>>>>> Vino
>>>>> 
>>>>> Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
>>>>>> 
>>>>>> Hi Kent,
>>>>>> 
>>>>>> Thanks for your pointer!
>>>>>> 
>>>>>> TGetQueryIdReq/Resp looks very promising.
>>>>>> 
>>>>>> Best,
>>>>>> Paul Lam
>>>>>> 
>>>>>>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> 


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Cheng Pan <pa...@gmail.com>.
> Personally speaking, I think 3) is most likely to be adopted by Flink community, since TRIGGER/SAVEPOINT are already reserved keywords in Flink SQL[1]. Should we propose only 3) to Flink community?

I respect your opinion and experience in Flink community, and if there
are other voices in the Flink community, we can offer other options.

I think the Kyuubi community can get a lot of benefits by following
the upstream-first philosophy.

Thanks,
Cheng Pan

On Wed, Mar 30, 2022 at 6:38 PM Paul Lam <pa...@gmail.com> wrote:
>
> Hi Cheng,
>
> Thanks a lot for your input! IMHO, now the whole design gets pretty clear.
>
> I try to re-summarize the following plan, please collect me if I’m wrong.
>
> 1. Discuss savepoint SQL syntax with Flink community
>
> There 3 types of syntax:
>
> 1) ANSI SQL style with extended object
>
> > SHOW SAVEPOINTS <query_id>
> > CREATE SAVEPOINT <query_id>
> > DROP SAVEPOINT <savepoint_id>
>
>
> 2) ANSI SQL style with stored procedure
>
> CALL trigger_savepoint(…)
> CALL show_savepoints(…)
> CALL drop_savepoint(...)
>
> 3) Command-like style
>
> TRIGGER SAVEPOINT <query_id>
> SHOW SAVEPOINTS <query_id>
> REMOVE SAVEPOINT <savepoint_path>
>
> If we reach agreement with Flink community on one of them, we adopt
> the syntax. There’re might be some duplicate efforts in a short time,
> but finally we will converge, and reuse most if not all functionalities
> that Flink provides.
>
> Personally speaking, I think 3) is most likely to be adopted by Flink
> community, since TRIGGER/SAVEPOINT are already reserved
> keywords in Flink SQL[1]. Should we propose only 3) to Flink
> community?
>
> 2. Draft a KIP about savepoint management
>
> TODO list as far we can see:
> 1) Support retrieving query ID in Flink engine
> 2) Introduce a SQL layer to support new SQL syntax if needed (compatible
>     with Flink SQL)
> 3) Support savepoint related operations in Flink engine
> 4) Extend Beeline to support query ID
>
> Best,
> Paul Lam
>
> > 2022年3月30日 16:01,Cheng Pan <pa...@gmail.com> 写道:
> >
> > Thanks Paul, I agree that we’ve reached a consensus on high-level, 1)
> > use SQL to manipulate the savepoint, 2) follow upstreaming-first
> > philosophy in SQL syntax and RPC protocol to achieve the best
> > compatibility and user experience.
> >
> > Specifically for details, add some comments.
> >
> >> 1) ANSI SQL
> >>  `CALL trigger_savepoint($query_id)`
> >>  `CALL show_savepoint($query_id)`
> >
> > We could give more flexibility to the concept of ANSI SQL-like.
> >
> > For instance, we have
> >
> > SHOW TABLES [LIKE ...]
> > ALTER TABLE <table_name> SET xxx
> > ALTER TABLE ADD ...
> > DROP TABLE <table_name>
> > SELECT xxx FROM <table_name>
> > DESC <table_name>
> >
> > We can extend SQL in same style for savepoints, e.g.
> >
> > SHOW SAVEPOINTS <query_id>
> > CREATE SAVEPOINT <query_id>
> > DROP SAVEPOINT <query_id>
> > SELECT ... FROM <system.savepoint_table_name> WHERE ...
> > DESC <query_id>
> >
> > One example is DistSQL[1]
> >
> > The command style is specific to introduce new SQL action keywords, e.g.
> >
> > OPTIMIZE <table_name>, VACUUM <table_name>, KILL <query_id>, KILL
> > QUERY <query_id>
> >
> > Usually, different engines/databases may have different syntax for the
> > same behavior or different behavior in the same syntax. Unless the
> > syntax has been adopted by the upstream, I prefer to use
> >
> > CALL <procedure_name>(arg1, arg2, ...)
> >
> > to avoid conflicting, and switch to the official syntax once the
> > upstream introduces the new syntax.
> >
> >> There 2 approach to return the query ID to the clients.
> >>
> >> 1) TGetQueryIdReq/Resp
> >> The clients need to request the query ID when a query is finished.
> >> Given that the origin semantic for the Req is to return all query IDs in the session[1],
> >> we may needed change it “the ID of the latest query”, or else it would be difficult
> >> for users to figure out which ID is the right one.
> >>
> >> 2) Return it in the result set
> >> This approach is straightforward. Flink returns a -1 as the affected rows,
> >> which is not very useful. We can simply replace that with the query ID.
> >
> > Have a look on the TGetQueryIdReq/Resp, I think we can simplify the procedure to
> >
> > 1. client sends an ExecuteQueryReq
> > 2. server returns an OpHandle to client immediately
> > 3. client sends TGetQueryIdReq(OpHandle) to ask for QueryId
> > periodically until a legal result.
> > 4. server returns the corresponding TGetQueryIdResp(QueryId) is
> > available, otherwise returns a predefined QueryId constant e.g.
> > 'UNDEFINED_QUERY_ID' if the statement does not accepted by the engine
> > (there is no queryId for the stmt now)
> >
> > [1] https://github.com/apache/shardingsphere/releases/tag/5.1.0
> >
> > Thanks,
> > Cheng Pan
> >
> > On Tue, Mar 29, 2022 at 7:13 PM 林小铂 <li...@163.com> wrote:
> >>
> >> Hi team,
> >>
> >> Sorry for the late follow-up. It took me some time to do some research.
> >>
> >> TL;DR  It’s good to express savepoint in SQL statements. We should join efforts
> >> withFlink community to discuss SQL syntax for savepoint statements.There’re
> >> mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And
> >> the rests are implementation details, such as how to return the query ID.
> >>
> >> We had an offline discussion on DingTalk last week, and I believe we’ve reached
> >> a consensus on some issues.
> >>
> >> As pointed out in the previous mails, we should consider
> >> 1. how to trigger a savepoint?
> >> 2. how to find the available savepoints/checkpoints for a job?
> >> 3. how to specify a savepoint/checkpoint for restore?
> >>
> >> However, 3 is already supported by Flink SQL client, leaving 2 questions. As we
> >> discussed previous, the most straightforward solution is to extend Flink’s SQL
> >> parser to support savepointcommand. In such way, we treat savepoint
> >> command as a normal SQL statement. So we could split the topic into SQL
> >> syntax and implementation.
> >>
> >> WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align
> >> these efforts with Flink community. So I think we should draft a proposal and
> >> start a discussion at Flink community to determine a solution , then we could
> >> implement it in Kyuubi first and push back to Flink (I’m planning to start a
> >> discussion in Flink community this week).
> >>
> >> We have two solutions (thanks to Cheng):
> >>
> >> 1) ANSI SQL
> >>
> >>   `CALL trigger_savepoint($query_id)`
> >>   `CALL show_savepoint($query_id)`
> >>
> >> pros:
> >> - no syntax conflict
> >> - respect ANSI SQL
> >>
> >> cons:
> >> - CALL is not used in Flink SQL yet
> >> - not sure if it’s viable to return savepoint paths, because stored procedures
> >>  should return rows count in normal cases
> >>
> >> 2)  Custom command
> >>
> >>  `TRIGGER SAVEPOINT $query_id`
> >>  `SHOW SAVEPOINT $query_id`
> >>
> >> pros:
> >> - simple syntax, easy to understand
> >>
> >> cons:
> >> - need to introduce new reserved keywords TRIGGER/SAVEPOINT
> >> - not ANSI-SQL compatible
> >>
> >>
> >> WRT implementations, first we need a query ID, namely Flink job ID,
> >> which we could acquire through TableResult with a few adjustments
> >> to ExecuteStatement in Flink Engine.
> >>
> >> There 2 approach to return the query ID to the clients.
> >>
> >> 1) TGetQueryIdReq/Resp
> >> The clients need to request the query ID when a query is finished.
> >> Given that the origin semantic for the Req is to return all query IDs in the session[1],
> >> we may needed change it “the ID of the latest query”, or else it would be difficult
> >> for users to figure out which ID is the right one.
> >>
> >> 2) Return it in the result set
> >> This approach is straightforward. Flink returns a -1 as the affected rows,
> >> which is not very useful. We can simply replace that with the query ID.
> >>
> >> Please tell me what do you think. Thanks a lot!
> >>
> >> [1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761
> >>
> >>
> >>> 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
> >>>
> >>> Hi Paul,
> >>>
> >>> Big +1 for the proposal.
> >>>
> >>> You can summarize all of this into a design document. And drive this feature!
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
> >>>>
> >>>> Hi Kent,
> >>>>
> >>>> Thanks for your pointer!
> >>>>
> >>>> TGetQueryIdReq/Resp looks very promising.
> >>>>
> >>>> Best,
> >>>> Paul Lam
> >>>>
> >>>>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
> >>>>>
> >>>>>
> >>>>
> >>
>

Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Paul Lam <pa...@gmail.com>.
Hi Cheng,

Thanks a lot for your input! IMHO, now the whole design gets pretty clear. 

I try to re-summarize the following plan, please collect me if I’m wrong.

1. Discuss savepoint SQL syntax with Flink community

There 3 types of syntax:

1) ANSI SQL style with extended object

> SHOW SAVEPOINTS <query_id>
> CREATE SAVEPOINT <query_id>
> DROP SAVEPOINT <savepoint_id>


2) ANSI SQL style with stored procedure

CALL trigger_savepoint(…)
CALL show_savepoints(…)
CALL drop_savepoint(...)

3) Command-like style 

TRIGGER SAVEPOINT <query_id>
SHOW SAVEPOINTS <query_id>
REMOVE SAVEPOINT <savepoint_path>

If we reach agreement with Flink community on one of them, we adopt 
the syntax. There’re might be some duplicate efforts in a short time,
but finally we will converge, and reuse most if not all functionalities 
that Flink provides.

Personally speaking, I think 3) is most likely to be adopted by Flink 
community, since TRIGGER/SAVEPOINT are already reserved 
keywords in Flink SQL[1]. Should we propose only 3) to Flink 
community?

2. Draft a KIP about savepoint management

TODO list as far we can see:
1) Support retrieving query ID in Flink engine
2) Introduce a SQL layer to support new SQL syntax if needed (compatible 
    with Flink SQL)
3) Support savepoint related operations in Flink engine
4) Extend Beeline to support query ID

Best,
Paul Lam

> 2022年3月30日 16:01,Cheng Pan <pa...@gmail.com> 写道:
> 
> Thanks Paul, I agree that we’ve reached a consensus on high-level, 1)
> use SQL to manipulate the savepoint, 2) follow upstreaming-first
> philosophy in SQL syntax and RPC protocol to achieve the best
> compatibility and user experience.
> 
> Specifically for details, add some comments.
> 
>> 1) ANSI SQL
>>  `CALL trigger_savepoint($query_id)`
>>  `CALL show_savepoint($query_id)`
> 
> We could give more flexibility to the concept of ANSI SQL-like.
> 
> For instance, we have
> 
> SHOW TABLES [LIKE ...]
> ALTER TABLE <table_name> SET xxx
> ALTER TABLE ADD ...
> DROP TABLE <table_name>
> SELECT xxx FROM <table_name>
> DESC <table_name>
> 
> We can extend SQL in same style for savepoints, e.g.
> 
> SHOW SAVEPOINTS <query_id>
> CREATE SAVEPOINT <query_id>
> DROP SAVEPOINT <query_id>
> SELECT ... FROM <system.savepoint_table_name> WHERE ...
> DESC <query_id>
> 
> One example is DistSQL[1]
> 
> The command style is specific to introduce new SQL action keywords, e.g.
> 
> OPTIMIZE <table_name>, VACUUM <table_name>, KILL <query_id>, KILL
> QUERY <query_id>
> 
> Usually, different engines/databases may have different syntax for the
> same behavior or different behavior in the same syntax. Unless the
> syntax has been adopted by the upstream, I prefer to use
> 
> CALL <procedure_name>(arg1, arg2, ...)
> 
> to avoid conflicting, and switch to the official syntax once the
> upstream introduces the new syntax.
> 
>> There 2 approach to return the query ID to the clients.
>> 
>> 1) TGetQueryIdReq/Resp
>> The clients need to request the query ID when a query is finished.
>> Given that the origin semantic for the Req is to return all query IDs in the session[1],
>> we may needed change it “the ID of the latest query”, or else it would be difficult
>> for users to figure out which ID is the right one.
>> 
>> 2) Return it in the result set
>> This approach is straightforward. Flink returns a -1 as the affected rows,
>> which is not very useful. We can simply replace that with the query ID.
> 
> Have a look on the TGetQueryIdReq/Resp, I think we can simplify the procedure to
> 
> 1. client sends an ExecuteQueryReq
> 2. server returns an OpHandle to client immediately
> 3. client sends TGetQueryIdReq(OpHandle) to ask for QueryId
> periodically until a legal result.
> 4. server returns the corresponding TGetQueryIdResp(QueryId) is
> available, otherwise returns a predefined QueryId constant e.g.
> 'UNDEFINED_QUERY_ID' if the statement does not accepted by the engine
> (there is no queryId for the stmt now)
> 
> [1] https://github.com/apache/shardingsphere/releases/tag/5.1.0
> 
> Thanks,
> Cheng Pan
> 
> On Tue, Mar 29, 2022 at 7:13 PM 林小铂 <li...@163.com> wrote:
>> 
>> Hi team,
>> 
>> Sorry for the late follow-up. It took me some time to do some research.
>> 
>> TL;DR  It’s good to express savepoint in SQL statements. We should join efforts
>> withFlink community to discuss SQL syntax for savepoint statements.There’re
>> mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And
>> the rests are implementation details, such as how to return the query ID.
>> 
>> We had an offline discussion on DingTalk last week, and I believe we’ve reached
>> a consensus on some issues.
>> 
>> As pointed out in the previous mails, we should consider
>> 1. how to trigger a savepoint?
>> 2. how to find the available savepoints/checkpoints for a job?
>> 3. how to specify a savepoint/checkpoint for restore?
>> 
>> However, 3 is already supported by Flink SQL client, leaving 2 questions. As we
>> discussed previous, the most straightforward solution is to extend Flink’s SQL
>> parser to support savepointcommand. In such way, we treat savepoint
>> command as a normal SQL statement. So we could split the topic into SQL
>> syntax and implementation.
>> 
>> WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align
>> these efforts with Flink community. So I think we should draft a proposal and
>> start a discussion at Flink community to determine a solution , then we could
>> implement it in Kyuubi first and push back to Flink (I’m planning to start a
>> discussion in Flink community this week).
>> 
>> We have two solutions (thanks to Cheng):
>> 
>> 1) ANSI SQL
>> 
>>   `CALL trigger_savepoint($query_id)`
>>   `CALL show_savepoint($query_id)`
>> 
>> pros:
>> - no syntax conflict
>> - respect ANSI SQL
>> 
>> cons:
>> - CALL is not used in Flink SQL yet
>> - not sure if it’s viable to return savepoint paths, because stored procedures
>>  should return rows count in normal cases
>> 
>> 2)  Custom command
>> 
>>  `TRIGGER SAVEPOINT $query_id`
>>  `SHOW SAVEPOINT $query_id`
>> 
>> pros:
>> - simple syntax, easy to understand
>> 
>> cons:
>> - need to introduce new reserved keywords TRIGGER/SAVEPOINT
>> - not ANSI-SQL compatible
>> 
>> 
>> WRT implementations, first we need a query ID, namely Flink job ID,
>> which we could acquire through TableResult with a few adjustments
>> to ExecuteStatement in Flink Engine.
>> 
>> There 2 approach to return the query ID to the clients.
>> 
>> 1) TGetQueryIdReq/Resp
>> The clients need to request the query ID when a query is finished.
>> Given that the origin semantic for the Req is to return all query IDs in the session[1],
>> we may needed change it “the ID of the latest query”, or else it would be difficult
>> for users to figure out which ID is the right one.
>> 
>> 2) Return it in the result set
>> This approach is straightforward. Flink returns a -1 as the affected rows,
>> which is not very useful. We can simply replace that with the query ID.
>> 
>> Please tell me what do you think. Thanks a lot!
>> 
>> [1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761
>> 
>> 
>>> 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
>>> 
>>> Hi Paul,
>>> 
>>> Big +1 for the proposal.
>>> 
>>> You can summarize all of this into a design document. And drive this feature!
>>> 
>>> Best,
>>> Vino
>>> 
>>> Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
>>>> 
>>>> Hi Kent,
>>>> 
>>>> Thanks for your pointer!
>>>> 
>>>> TGetQueryIdReq/Resp looks very promising.
>>>> 
>>>> Best,
>>>> Paul Lam
>>>> 
>>>>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
>>>>> 
>>>>> 
>>>> 
>> 


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Cheng Pan <pa...@gmail.com>.
Thanks Paul, I agree that we’ve reached a consensus on high-level, 1)
use SQL to manipulate the savepoint, 2) follow upstreaming-first
philosophy in SQL syntax and RPC protocol to achieve the best
compatibility and user experience.

Specifically for details, add some comments.

> 1) ANSI SQL
>   `CALL trigger_savepoint($query_id)`
>   `CALL show_savepoint($query_id)`

We could give more flexibility to the concept of ANSI SQL-like.

For instance, we have

SHOW TABLES [LIKE ...]
ALTER TABLE <table_name> SET xxx
ALTER TABLE ADD ...
DROP TABLE <table_name>
SELECT xxx FROM <table_name>
DESC <table_name>

We can extend SQL in same style for savepoints, e.g.

SHOW SAVEPOINTS <query_id>
CREATE SAVEPOINT <query_id>
DROP SAVEPOINT <query_id>
SELECT ... FROM <system.savepoint_table_name> WHERE ...
DESC <query_id>

One example is DistSQL[1]

The command style is specific to introduce new SQL action keywords, e.g.

OPTIMIZE <table_name>, VACUUM <table_name>, KILL <query_id>, KILL
QUERY <query_id>

Usually, different engines/databases may have different syntax for the
same behavior or different behavior in the same syntax. Unless the
syntax has been adopted by the upstream, I prefer to use

CALL <procedure_name>(arg1, arg2, ...)

to avoid conflicting, and switch to the official syntax once the
upstream introduces the new syntax.

> There 2 approach to return the query ID to the clients.
>
> 1) TGetQueryIdReq/Resp
> The clients need to request the query ID when a query is finished.
> Given that the origin semantic for the Req is to return all query IDs in the session[1],
> we may needed change it “the ID of the latest query”, or else it would be difficult
> for users to figure out which ID is the right one.
>
> 2) Return it in the result set
> This approach is straightforward. Flink returns a -1 as the affected rows,
> which is not very useful. We can simply replace that with the query ID.

Have a look on the TGetQueryIdReq/Resp, I think we can simplify the procedure to

1. client sends an ExecuteQueryReq
2. server returns an OpHandle to client immediately
3. client sends TGetQueryIdReq(OpHandle) to ask for QueryId
periodically until a legal result.
4. server returns the corresponding TGetQueryIdResp(QueryId) is
available, otherwise returns a predefined QueryId constant e.g.
'UNDEFINED_QUERY_ID' if the statement does not accepted by the engine
(there is no queryId for the stmt now)

[1] https://github.com/apache/shardingsphere/releases/tag/5.1.0

Thanks,
Cheng Pan

On Tue, Mar 29, 2022 at 7:13 PM 林小铂 <li...@163.com> wrote:
>
> Hi team,
>
> Sorry for the late follow-up. It took me some time to do some research.
>
> TL;DR  It’s good to express savepoint in SQL statements. We should join efforts
> withFlink community to discuss SQL syntax for savepoint statements.There’re
> mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And
> the rests are implementation details, such as how to return the query ID.
>
> We had an offline discussion on DingTalk last week, and I believe we’ve reached
> a consensus on some issues.
>
> As pointed out in the previous mails, we should consider
> 1. how to trigger a savepoint?
> 2. how to find the available savepoints/checkpoints for a job?
> 3. how to specify a savepoint/checkpoint for restore?
>
> However, 3 is already supported by Flink SQL client, leaving 2 questions. As we
> discussed previous, the most straightforward solution is to extend Flink’s SQL
> parser to support savepointcommand. In such way, we treat savepoint
> command as a normal SQL statement. So we could split the topic into SQL
> syntax and implementation.
>
> WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align
> these efforts with Flink community. So I think we should draft a proposal and
> start a discussion at Flink community to determine a solution , then we could
> implement it in Kyuubi first and push back to Flink (I’m planning to start a
> discussion in Flink community this week).
>
> We have two solutions (thanks to Cheng):
>
> 1) ANSI SQL
>
>    `CALL trigger_savepoint($query_id)`
>    `CALL show_savepoint($query_id)`
>
> pros:
> - no syntax conflict
> - respect ANSI SQL
>
> cons:
> - CALL is not used in Flink SQL yet
> - not sure if it’s viable to return savepoint paths, because stored procedures
>   should return rows count in normal cases
>
> 2)  Custom command
>
>   `TRIGGER SAVEPOINT $query_id`
>   `SHOW SAVEPOINT $query_id`
>
> pros:
> - simple syntax, easy to understand
>
> cons:
> - need to introduce new reserved keywords TRIGGER/SAVEPOINT
> - not ANSI-SQL compatible
>
>
> WRT implementations, first we need a query ID, namely Flink job ID,
> which we could acquire through TableResult with a few adjustments
> to ExecuteStatement in Flink Engine.
>
> There 2 approach to return the query ID to the clients.
>
> 1) TGetQueryIdReq/Resp
> The clients need to request the query ID when a query is finished.
> Given that the origin semantic for the Req is to return all query IDs in the session[1],
> we may needed change it “the ID of the latest query”, or else it would be difficult
> for users to figure out which ID is the right one.
>
> 2) Return it in the result set
> This approach is straightforward. Flink returns a -1 as the affected rows,
> which is not very useful. We can simply replace that with the query ID.
>
> Please tell me what do you think. Thanks a lot!
>
> [1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761
>
>
> > 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
> >
> > Hi Paul,
> >
> > Big +1 for the proposal.
> >
> > You can summarize all of this into a design document. And drive this feature!
> >
> > Best,
> > Vino
> >
> > Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
> >>
> >> Hi Kent,
> >>
> >> Thanks for your pointer!
> >>
> >> TGetQueryIdReq/Resp looks very promising.
> >>
> >> Best,
> >> Paul Lam
> >>
> >>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
> >>>
> >>>
> >>
>

Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by 林小铂 <li...@163.com>.
Hi team,

Sorry for the late follow-up. It took me some time to do some research.

TL;DR  It’s good to express savepoint in SQL statements. We should join efforts 
withFlink community to discuss SQL syntax for savepoint statements.There’re
mainly two styles of SQL syntax to discuss: ANIS-SQL and command-like. And 
the rests are implementation details, such as how to return the query ID.

We had an offline discussion on DingTalk last week, and I believe we’ve reached 
a consensus on some issues.

As pointed out in the previous mails, we should consider
1. how to trigger a savepoint?
2. how to find the available savepoints/checkpoints for a job?
3. how to specify a savepoint/checkpoint for restore?

However, 3 is already supported by Flink SQL client, leaving 2 questions. As we 
discussed previous, the most straightforward solution is to extend Flink’s SQL 
parser to support savepointcommand. In such way, we treat savepoint
command as a normal SQL statement. So we could split the topic into SQL 
syntax and implementation.

WRT SQL syntax, to follow upstreaming-first philosophy, we’d better to align 
these efforts with Flink community. So I think we should draft a proposal and 
start a discussion at Flink community to determine a solution , then we could
implement it in Kyuubi first and push back to Flink (I’m planning to start a 
discussion in Flink community this week).

We have two solutions (thanks to Cheng):

1) ANSI SQL

   `CALL trigger_savepoint($query_id)`
   `CALL show_savepoint($query_id)`

pros: 
- no syntax conflict
- respect ANSI SQL

cons:
- CALL is not used in Flink SQL yet
- not sure if it’s viable to return savepoint paths, because stored procedures 
  should return rows count in normal cases

2)  Custom command

  `TRIGGER SAVEPOINT $query_id`
  `SHOW SAVEPOINT $query_id`

pros:
- simple syntax, easy to understand

cons:
- need to introduce new reserved keywords TRIGGER/SAVEPOINT 
- not ANSI-SQL compatible


WRT implementations, first we need a query ID, namely Flink job ID,
which we could acquire through TableResult with a few adjustments 
to ExecuteStatement in Flink Engine. 

There 2 approach to return the query ID to the clients. 

1) TGetQueryIdReq/Resp 
The clients need to request the query ID when a query is finished. 
Given that the origin semantic for the Req is to return all query IDs in the session[1], 
we may needed change it “the ID of the latest query”, or else it would be difficult 
for users to figure out which ID is the right one.

2) Return it in the result set 
This approach is straightforward. Flink returns a -1 as the affected rows, 
which is not very useful. We can simply replace that with the query ID.

Please tell me what do you think. Thanks a lot!

[1] https://github.com/apache/hive/blob/bf84d8a1f715d7457037192d97676aeffa35d571/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java#L1761


> 2022年3月24日 18:15,Vino Yang <ya...@gmail.com> 写道:
> 
> Hi Paul,
> 
> Big +1 for the proposal.
> 
> You can summarize all of this into a design document. And drive this feature!
> 
> Best,
> Vino
> 
> Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
>> 
>> Hi Kent,
>> 
>> Thanks for your pointer!
>> 
>> TGetQueryIdReq/Resp looks very promising.
>> 
>> Best,
>> Paul Lam
>> 
>>> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
>>> 
>>> 
>> 


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Vino Yang <ya...@gmail.com>.
Hi Paul,

Big +1 for the proposal.

You can summarize all of this into a design document. And drive this feature!

Best,
Vino

Paul Lam <pa...@gmail.com> 于2022年3月22日周二 14:40写道:
>
> Hi Kent,
>
> Thanks for your pointer!
>
> TGetQueryIdReq/Resp looks very promising.
>
> Best,
> Paul Lam
>
> > 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
> >
> >
>

Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Paul Lam <pa...@gmail.com>.
Hi Kent,

Thanks for your pointer! 

TGetQueryIdReq/Resp looks very promising. 

Best,
Paul Lam

> 2022年3月21日 12:20,Kent Yao <ya...@apache.org> 写道:
> 
>  


Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Kent Yao <ya...@apache.org>.
Hi Pual,


We have upgrade hive-service rpc to 3.1.x, Are the new interfaces TGetQueryIdReq/Resp suite for Flink Job ID?

Kent Yao

On 2022/03/18 13:18:20 Paul Lam wrote:
> Hi Cheng,
> 
> Thanks a lot for your input!
> 
> It'd be great to introduce a SQL layer. That gives Kyuubi more flexibility
> to implement its own functionality.  +1 for the SQL layer.
> 
> For syntax, I don't have a strong preference, command-like or SQL-like both
> sound good to me.
> 
> Best,
> Paul Lam
> 
> Cheng Pan <pa...@gmail.com> 于2022年3月18日周五 20:11写道:
> 
> > Thanks Paul for bringing this discussion. Added some my thoughts,
> > please correct me if I'm wrong since I'm not very familiar with Flink.
> >
> > > Savepoint operation is not SQL, thus can’t be passed to Flink engine
> > like a normal statement, thus we may need a new operation type.
> >
> > Technically,  it's an easy way to implement, since we already found a
> > way to extend the protocol and meanwhile keep it compatible, an
> > example is the LaunchEngine operator.
> > But I'm thinking if we can introduce a SQL layer to handle it?
> > From the previous discussion[1], Kyuubi definitely requires a SQL
> > layer, and with the SQL layer, we can even make the savepoint
> > queryable in SQL-like syntax!
> >
> > > Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but
> > this doesn’t work for async queries, which is very common in streaming
> > scenarios. Thus we may need to extend beeline to support a cancel command.
> >
> > LGTM, we can implement a `!cancel` command in beeline, besides, we can
> > introduce a SQL-like syntax to achieve it, e.g. `CALL
> > cancel(query_id='12345')`
> >
> > [1]
> > https://github.com/apache/incubator-kyuubi/pull/2048#issuecomment-1059980949
> >
> > Thanks,
> > Cheng Pan
> >
> > On Fri, Mar 18, 2022 at 5:06 PM Paul Lam <pa...@gmail.com> wrote:
> > >
> > > Hi team,
> > >
> > > As we aimed to make Flink engine production-ready [1], Flink
> > savepoint/checkpoint management
> > > is a currently missing but crucial part, which we should prioritize.
> > Therefore, I start this thread to
> > > discus the implementation of Flink savepoint/checkpoint management.
> > >
> > > There’re mainly three questions we need to think about:
> > > 1. how to trigger a savepoint?
> > > 2. how to find the available savepoints/checkpoints for a job?
> > > 3. how to specify a savepoint/checkpoint for restore?
> > >
> > > # 1. how to trigger a savepoint
> > > Apart from the automatic checkpoint, Flink allows user to manually
> > trigger a savepoint,
> > > either during job running period or on stopping. To support that,
> > there’re two prerequisites.
> > >
> > > 1) support savepoint/cancel operations
> > > Savepoint operation is not SQL, thus can’t be passed to Flink engine
> > like a normal statement,
> > > thus we may need a new operation type.
> > >
> > > Cancel query operation is supported currently, but it’s not exposed to
> > beeline in a production-
> > > ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query
> > cancel [2], but this doesn’t
> > > work for async queries, which is very common in streaming scenarios.
> > Thus we may
> > > need to extend beeline to support a cancel command.
> > >
> > > To sum up, I think we might need to add a `savepoint` operation in
> > Kyuubi Server, and expose
> > > savepoint and cancel operations in Beeline (maybe JDBC as well if
> > possible).
> > >
> > > 2) expose Flink Job ID
> > > To track an async query, we need an ID. It could be the operation ID of
> > Kyuubi or job ID of Flink
> > > (maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata,
> > we may lose track the
> > > Flink jobs after a restart. So I propose to expose Flink Job IDs to
> > users, to let users bookkeep
> > > the IDs manually, and supports built-in Job management after Kyuubi has
> > metadata persistence.
> > >
> > > The users’ workflow should be like:
> > > 1. execute a Flink query which creates a Flink job and returns the Job ID
> > > 2. trigger a savepoint using the Job ID which returns a savepoint path
> > > 3. cancel the query using the Job ID (or just cancel-with-savepoint)
> > >
> > > # 2. how to find available savepoints/checkpoints for a job
> > > In some cases, Flink job crushes and we need to find an available
> > savepoint or checkpoint for
> > > job restoring. This can be done via Flink history server or searching
> > the checkpoint/savepoint
> > > directories.
> > >
> > > I think it’s good to support automatically searching for the available
> > savepoint/checkpoints, but
> > > at the early phase, it’s okay to let users to do it manually. It doesn’t
> > block Flink engine to be
> > > production-ready.
> > >
> > > # 3. how to specify a savepoint/checkpoint for restore
> > > This question is relatively simple: add a new configuration option for
> > users to set savepoint/
> > > checkpoint path for restore before we provide the automatic search, and
> > optionally automatically
> > > set the path after that .
> > >
> > > 1. https://github.com/apache/incubator-kyuubi/issues/2100 <
> > https://github.com/apache/incubator-kyuubi/issues/2100>
> > > 2.
> > https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery
> > >
> > > Best,
> > > Paul Lam
> > >
> >
> 

Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Paul Lam <pa...@gmail.com>.
Hi Cheng,

Thanks a lot for your input!

It'd be great to introduce a SQL layer. That gives Kyuubi more flexibility
to implement its own functionality.  +1 for the SQL layer.

For syntax, I don't have a strong preference, command-like or SQL-like both
sound good to me.

Best,
Paul Lam

Cheng Pan <pa...@gmail.com> 于2022年3月18日周五 20:11写道:

> Thanks Paul for bringing this discussion. Added some my thoughts,
> please correct me if I'm wrong since I'm not very familiar with Flink.
>
> > Savepoint operation is not SQL, thus can’t be passed to Flink engine
> like a normal statement, thus we may need a new operation type.
>
> Technically,  it's an easy way to implement, since we already found a
> way to extend the protocol and meanwhile keep it compatible, an
> example is the LaunchEngine operator.
> But I'm thinking if we can introduce a SQL layer to handle it?
> From the previous discussion[1], Kyuubi definitely requires a SQL
> layer, and with the SQL layer, we can even make the savepoint
> queryable in SQL-like syntax!
>
> > Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but
> this doesn’t work for async queries, which is very common in streaming
> scenarios. Thus we may need to extend beeline to support a cancel command.
>
> LGTM, we can implement a `!cancel` command in beeline, besides, we can
> introduce a SQL-like syntax to achieve it, e.g. `CALL
> cancel(query_id='12345')`
>
> [1]
> https://github.com/apache/incubator-kyuubi/pull/2048#issuecomment-1059980949
>
> Thanks,
> Cheng Pan
>
> On Fri, Mar 18, 2022 at 5:06 PM Paul Lam <pa...@gmail.com> wrote:
> >
> > Hi team,
> >
> > As we aimed to make Flink engine production-ready [1], Flink
> savepoint/checkpoint management
> > is a currently missing but crucial part, which we should prioritize.
> Therefore, I start this thread to
> > discus the implementation of Flink savepoint/checkpoint management.
> >
> > There’re mainly three questions we need to think about:
> > 1. how to trigger a savepoint?
> > 2. how to find the available savepoints/checkpoints for a job?
> > 3. how to specify a savepoint/checkpoint for restore?
> >
> > # 1. how to trigger a savepoint
> > Apart from the automatic checkpoint, Flink allows user to manually
> trigger a savepoint,
> > either during job running period or on stopping. To support that,
> there’re two prerequisites.
> >
> > 1) support savepoint/cancel operations
> > Savepoint operation is not SQL, thus can’t be passed to Flink engine
> like a normal statement,
> > thus we may need a new operation type.
> >
> > Cancel query operation is supported currently, but it’s not exposed to
> beeline in a production-
> > ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query
> cancel [2], but this doesn’t
> > work for async queries, which is very common in streaming scenarios.
> Thus we may
> > need to extend beeline to support a cancel command.
> >
> > To sum up, I think we might need to add a `savepoint` operation in
> Kyuubi Server, and expose
> > savepoint and cancel operations in Beeline (maybe JDBC as well if
> possible).
> >
> > 2) expose Flink Job ID
> > To track an async query, we need an ID. It could be the operation ID of
> Kyuubi or job ID of Flink
> > (maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata,
> we may lose track the
> > Flink jobs after a restart. So I propose to expose Flink Job IDs to
> users, to let users bookkeep
> > the IDs manually, and supports built-in Job management after Kyuubi has
> metadata persistence.
> >
> > The users’ workflow should be like:
> > 1. execute a Flink query which creates a Flink job and returns the Job ID
> > 2. trigger a savepoint using the Job ID which returns a savepoint path
> > 3. cancel the query using the Job ID (or just cancel-with-savepoint)
> >
> > # 2. how to find available savepoints/checkpoints for a job
> > In some cases, Flink job crushes and we need to find an available
> savepoint or checkpoint for
> > job restoring. This can be done via Flink history server or searching
> the checkpoint/savepoint
> > directories.
> >
> > I think it’s good to support automatically searching for the available
> savepoint/checkpoints, but
> > at the early phase, it’s okay to let users to do it manually. It doesn’t
> block Flink engine to be
> > production-ready.
> >
> > # 3. how to specify a savepoint/checkpoint for restore
> > This question is relatively simple: add a new configuration option for
> users to set savepoint/
> > checkpoint path for restore before we provide the automatic search, and
> optionally automatically
> > set the path after that .
> >
> > 1. https://github.com/apache/incubator-kyuubi/issues/2100 <
> https://github.com/apache/incubator-kyuubi/issues/2100>
> > 2.
> https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery
> >
> > Best,
> > Paul Lam
> >
>

Re: [DISSCUS][Flink Engine] Flink Savepoint/Checkpoint Management

Posted by Cheng Pan <pa...@gmail.com>.
Thanks Paul for bringing this discussion. Added some my thoughts,
please correct me if I'm wrong since I'm not very familiar with Flink.

> Savepoint operation is not SQL, thus can’t be passed to Flink engine like a normal statement, thus we may need a new operation type.

Technically,  it's an easy way to implement, since we already found a
way to extend the protocol and meanwhile keep it compatible, an
example is the LaunchEngine operator.
But I'm thinking if we can introduce a SQL layer to handle it?
From the previous discussion[1], Kyuubi definitely requires a SQL
layer, and with the SQL layer, we can even make the savepoint
queryable in SQL-like syntax!

> Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but this doesn’t work for async queries, which is very common in streaming scenarios. Thus we may need to extend beeline to support a cancel command.

LGTM, we can implement a `!cancel` command in beeline, besides, we can
introduce a SQL-like syntax to achieve it, e.g. `CALL
cancel(query_id='12345')`

[1] https://github.com/apache/incubator-kyuubi/pull/2048#issuecomment-1059980949

Thanks,
Cheng Pan

On Fri, Mar 18, 2022 at 5:06 PM Paul Lam <pa...@gmail.com> wrote:
>
> Hi team,
>
> As we aimed to make Flink engine production-ready [1], Flink savepoint/checkpoint management
> is a currently missing but crucial part, which we should prioritize. Therefore, I start this thread to
> discus the implementation of Flink savepoint/checkpoint management.
>
> There’re mainly three questions we need to think about:
> 1. how to trigger a savepoint?
> 2. how to find the available savepoints/checkpoints for a job?
> 3. how to specify a savepoint/checkpoint for restore?
>
> # 1. how to trigger a savepoint
> Apart from the automatic checkpoint, Flink allows user to manually trigger a savepoint,
> either during job running period or on stopping. To support that, there’re two prerequisites.
>
> 1) support savepoint/cancel operations
> Savepoint operation is not SQL, thus can’t be passed to Flink engine like a normal statement,
> thus we may need a new operation type.
>
> Cancel query operation is supported currently, but it’s not exposed to beeline in a production-
> ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but this doesn’t
> work for async queries, which is very common in streaming scenarios. Thus we may
> need to extend beeline to support a cancel command.
>
> To sum up, I think we might need to add a `savepoint` operation in Kyuubi Server, and expose
> savepoint and cancel operations in Beeline (maybe JDBC as well if possible).
>
> 2) expose Flink Job ID
> To track an async query, we need an ID. It could be the operation ID of Kyuubi or job ID of Flink
> (maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata, we may lose track the
> Flink jobs after a restart. So I propose to expose Flink Job IDs to users, to let users bookkeep
> the IDs manually, and supports built-in Job management after Kyuubi has metadata persistence.
>
> The users’ workflow should be like:
> 1. execute a Flink query which creates a Flink job and returns the Job ID
> 2. trigger a savepoint using the Job ID which returns a savepoint path
> 3. cancel the query using the Job ID (or just cancel-with-savepoint)
>
> # 2. how to find available savepoints/checkpoints for a job
> In some cases, Flink job crushes and we need to find an available savepoint or checkpoint for
> job restoring. This can be done via Flink history server or searching the checkpoint/savepoint
> directories.
>
> I think it’s good to support automatically searching for the available savepoint/checkpoints, but
> at the early phase, it’s okay to let users to do it manually. It doesn’t block Flink engine to be
> production-ready.
>
> # 3. how to specify a savepoint/checkpoint for restore
> This question is relatively simple: add a new configuration option for users to set savepoint/
> checkpoint path for restore before we provide the automatic search, and optionally automatically
> set the path after that .
>
> 1. https://github.com/apache/incubator-kyuubi/issues/2100 <https://github.com/apache/incubator-kyuubi/issues/2100>
> 2. https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery
>
> Best,
> Paul Lam
>