You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Mang Zhang <zh...@163.com> on 2023/04/10 03:02:46 UTC

[DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Hi, everyone




I'd like to start a discussion about FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement [1].




CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.




So I want Flink to support atomic CTAS, where only the table is created when the Job succeeds. Improve user experience.




Looking forward to your feedback.




[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement










--

Best regards,
Mang Zhang

Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi, Yuxia
Thank you for your reply.
We can identify whether a CatalogTable supports atomic Ctas by determining its type in DynamicTableFactory/DynamicTableSink, like the following:
boolean isAtomicCtas = context.getCatalogTable().getOrigin() instanceof TwoPhaseCatalogTable;
And I've updated the flip.  
this is my poc commit : https://github.com/Tartarus0zm/flink/commit/ca82b6a816491df5a251b410f4c614436402d2dc
Looking forward to more feedback










--

Best regards,
Mang Zhang





At 2023-04-14 19:46:08, "yuxia" <lu...@alumni.sjtu.edu.cn> wrote:
>Hi, Mang.
>+1 for completing the support for atomicity of CTAS, this is very useful in batch scenarios and integrate with the data lake which support transcation.
>
>I just have one question, IIUC, the DynamiacTableSink will need to know it's for normal case or the atomicity with CTAS as well as neccessary context.
>Take jdbc catalog as an example, if it's CTAS with atomicity supports, the jdbc DynamiacTableSink will write the temp table defined in the TwoPhaseCatalogTable which is different from normal case.
>
>How can the DynamiacTableSink can get it? Could you give some explanation or example in this FLIP?
>
>
>Best regards,
>Yuxia
>
>----- 原始邮件 -----
>发件人: "zhangmang1" <zh...@163.com>
>收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <li...@gmail.com>
>发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement
>
>Hi, Lincoln and Ron
>
>
>Thank you for your reply.
>On the naming wise I think OK, the future expansion of new features more uniform. I have updated the FLIP.
>
>
>About Hive support atomicity CTAS, Hive is rich in usage scenarios and can be divided into three scenarios: 1. writing Hive tables 2. writing Hive tables with speculative execution 3. writing Hive table with small file merge
>
>
>The main purpose of FLIP-305 is to implement support for CTAS atomicity in the Flink framework,
>so I only poc to verify the first scenario of writing to the Hive table, and we can subsequently split the sub-task to support the other two scenarios.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best regards,
>Mang Zhang
>
>
>
>
>
>At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>>Hi, Mang
>>
>>+1 for completing the support for atomicity of CTAS, this is very useful in
>>batch scenarios.
>>
>>I have two questions:
>>1. naming wise:
>>  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>>`Catalog#twoPhaseCreateTable` (and we may add
>>twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>>  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>>`TwoPhaseCatalogTable`?
>>  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>>in the method name, which may remind users of the relevance of transaction
>>support (however, it is not strictly so), so I suggest changing it to
>>`begin`
>>2. Has this design been validated by any relevant Poc on hive or other
>>catalogs?
>>
>>Best,
>>Lincoln Lee
>>
>>
>>liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>>
>>> Hi, Mang
>>> Atomicity is very important for CTAS, especially for batch jobs. This FLIP
>>> is a continuation of FLIP-218, which is valuable for CTAS.
>>> I just have one question, in the Motivation part of FLIP-218, we mentioned
>>> three levels of atomicity semantics, can this current design do the same as
>>> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>>> for example, can it be done by writing to Hive tables using CTAS?
>>>
>>> Best,
>>> Ron
>>>
>>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>>>
>>> > Hi, everyone
>>> >
>>> >
>>> >
>>> >
>>> > I'd like to start a discussion about FLIP-305: Support atomic for CREATE
>>> > TABLE AS SELECT(CTAS) statement [1].
>>> >
>>> >
>>> >
>>> >
>>> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>>> > atomic. It will create the table first before job running. If the job
>>> > execution fails, or is cancelled, the table will not be dropped.
>>> >
>>> >
>>> >
>>> >
>>> > So I want Flink to support atomic CTAS, where only the table is created
>>> > when the Job succeeds. Improve user experience.
>>> >
>>> >
>>> >
>>> >
>>> > Looking forward to your feedback.
>>> >
>>> >
>>> >
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> > Best regards,
>>> > Mang Zhang
>>>

Re:Re: Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Jing,


Currently, we cannot determine in the planner whether the source is bounded or unbounded.
So when we design the API, we use the execution model to help determine if atomicity can be supported.
Thank you very much for your reply!




--

Best regards,
Mang Zhang





At 2023-04-28 21:51:02, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>Hi Mang,
>
>Boundedness and execution modes are two orthogonal concepts. Since atomic
>CTAS will be only supported for bounded data, which means it does not
>depend on the execution modes. I was wondering if it is possible to only
>provide (or call) twoPhaseCreateTable for bounded data (in both streaming
>and batch mode) and let unbounded data use the non-atomic CTAS? In this
>way, we could avoid the selector argument code smell.
>
>Best regards,
>Jing
>
>On Tue, Apr 25, 2023 at 10:04 AM Mang Zhang <zh...@163.com> wrote:
>
>> Hi Jing,
>> Yes, the atomic CTAS will be only supported for bounded data, but the
>> execution modes can be stream or batch.
>> I introduced the isStreamingMode parameter in the twoPhaseCreateTable API
>> to make it easier for users to provide different levels of atomicity
>> implementation depending on the capabilities of the backend service.
>> For example, in the case of data synchronization, it is common to run the
>> job using Stream mode, but also expect the data to be visible to the user
>> only after the synchronization is complete.
>> flink cdc's synchronized data scenario, where the user must first write to
>> a temporary table and then manually rename it to the final table;
>> unfriendly to user experience.
>> Developers providing twoPhaseCreateTable capability in Catalog can decide
>> whether to support atomicity based on the execution mode, or they can
>> choose to provide lightweight atomicity support in Stream mode, such as
>> automatically renaming the table name for the user.
>>
>>
>>
>> --
>>
>> Best regards,
>>
>> Mang Zhang
>>
>>
>>
>> At 2023-04-24 15:41:31, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >Hi Mang,
>> >
>> >
>> >
>> >Thanks for clarifying it. I am trying to understand your thoughts. Do you
>> >actually mean the boundedness[1] instead of the execution modes[2]? I.e.
>> >the atomic CTAS will be only supported for bounded data.
>> >
>> >
>> >
>> >Best regards,
>> >
>> >Jing
>> >
>> >
>> >
>> >[1] https://flink.apache.org/what-is-flink/flink-architecture/
>> >
>> >[2]
>> >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>> >
>> >On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zh...@163.com> wrote:
>> >
>> >> hi, Jing
>> >>
>> >> Thank you for your reply.
>> >>
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table.
>> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions.
>> >>
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >>
>> >> Here's what I think about this issue, atomic CTAS wants to be the default
>> >> behavior and only fall back to non-atomic CTAS if it's completely
>> >> unattainable. Atomic CTAS will bring a better experience to users.
>> >> Flink is already a stream batch unified engine, In our company kwai, many
>> >> users are also using flink to do batch data processing, but still running
>> >> in Stream mode.
>> >> The boundary between stream and batch is gradually blurred, stream mode
>> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> >> provides different atomicity implementations in Batch and Stream modes.
>> >> Not only to determine if atomicity is supported, but also to help select
>> >> different TwoPhaseCatalogTable implementations to provide different levels
>> >> of atomicity!
>> >>
>> >> Looking forward to more feedback.
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >> Best regards,
>> >>
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >> >Hi Mang,
>> >> >
>> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >> >driving it. I have two questions and would like to know your thoughts,
>> >> >thanks:
>> >> >
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> >
>> >> >Best regards,
>> >> >Jing
>> >> >
>> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>> >> >
>> >> >> Hi, Mang.
>> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
>> >> >> in batch scenarios and integrate with the data lake which support
>> >> >> transcation.
>> >> >>
>> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> >> context.
>> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
>> >> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> >> TwoPhaseCatalogTable which is different from normal case.
>> >> >>
>> >> >> How can the DynamiacTableSink can get it? Could you give some explanation
>> >> >> or example in this FLIP?
>> >> >>
>> >> >>
>> >> >> Best regards,
>> >> >> Yuxia
>> >> >>
>> >> >> ----- 原始邮件 -----
>> >> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> >> >> "lincoln 86xy" <li...@gmail.com>
>> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> >> SELECT(CTAS) statement
>> >> >>
>> >> >> Hi, Lincoln and Ron
>> >> >>
>> >> >>
>> >> >> Thank you for your reply.
>> >> >> On the naming wise I think OK, the future expansion of new features more
>> >> >> uniform. I have updated the FLIP.
>> >> >>
>> >> >>
>> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
>> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> >> tables with speculative execution 3. writing Hive table with small file
>> >> >> merge
>> >> >>
>> >> >>
>> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
>> >> >> the Flink framework,
>> >> >> so I only poc to verify the first scenario of writing to the Hive table,
>> >> >> and we can subsequently split the sub-task to support the other two
>> >> >> scenarios.
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Best regards,
>> >> >> Mang Zhang
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >> >> >Hi, Mang
>> >> >> >
>> >> >> >+1 for completing the support for atomicity of CTAS, this is very useful
>> >> >> in
>> >> >> >batch scenarios.
>> >> >> >
>> >> >> >I have two questions:
>> >> >> >1. naming wise:
>> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >> >`TwoPhaseCatalogTable`?
>> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>> >> >> >in the method name, which may remind users of the relevance of transaction
>> >> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >> >`begin`
>> >> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >> >catalogs?
>> >> >> >
>> >> >> >Best,
>> >> >> >Lincoln Lee
>> >> >> >
>> >> >> >
>> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >> >
>> >> >> >> Hi, Mang
>> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> >> >> FLIP
>> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> >> mentioned
>> >> >> >> three levels of atomicity semantics, can this current design do the
>> >> >> same as
>> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Ron
>> >> >> >>
>> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >> >>
>> >> >> >> > Hi, everyone
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> >> CREATE
>> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> >> >> >> > atomic. It will create the table first before job running. If the job
>> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> >> created
>> >> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > Looking forward to your feedback.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > [1]
>> >> >> >> >
>> >> >> >>
>> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > --
>> >> >> >> >
>> >> >> >> > Best regards,
>> >> >> >> > Mang Zhang
>> >> >> >>
>> >> >>
>> >>
>> >>
>>
>>

Re: Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Mang,

Boundedness and execution modes are two orthogonal concepts. Since atomic
CTAS will be only supported for bounded data, which means it does not
depend on the execution modes. I was wondering if it is possible to only
provide (or call) twoPhaseCreateTable for bounded data (in both streaming
and batch mode) and let unbounded data use the non-atomic CTAS? In this
way, we could avoid the selector argument code smell.

Best regards,
Jing

On Tue, Apr 25, 2023 at 10:04 AM Mang Zhang <zh...@163.com> wrote:

> Hi Jing,
> Yes, the atomic CTAS will be only supported for bounded data, but the
> execution modes can be stream or batch.
> I introduced the isStreamingMode parameter in the twoPhaseCreateTable API
> to make it easier for users to provide different levels of atomicity
> implementation depending on the capabilities of the backend service.
> For example, in the case of data synchronization, it is common to run the
> job using Stream mode, but also expect the data to be visible to the user
> only after the synchronization is complete.
> flink cdc's synchronized data scenario, where the user must first write to
> a temporary table and then manually rename it to the final table;
> unfriendly to user experience.
> Developers providing twoPhaseCreateTable capability in Catalog can decide
> whether to support atomicity based on the execution mode, or they can
> choose to provide lightweight atomicity support in Stream mode, such as
> automatically renaming the table name for the user.
>
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-04-24 15:41:31, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >Hi Mang,
> >
> >
> >
> >Thanks for clarifying it. I am trying to understand your thoughts. Do you
> >actually mean the boundedness[1] instead of the execution modes[2]? I.e.
> >the atomic CTAS will be only supported for bounded data.
> >
> >
> >
> >Best regards,
> >
> >Jing
> >
> >
> >
> >[1] https://flink.apache.org/what-is-flink/flink-architecture/
> >
> >[2]
> >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
> >
> >On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zh...@163.com> wrote:
> >
> >> hi, Jing
> >>
> >> Thank you for your reply.
> >>
> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >> >described in FLIP-218. Did I understand correctly?
> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table.
> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions.
> >>
> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >smell) we should commonly avoid in the public interface. According to the
> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >support atomic or not. With this selector argument, there will be two
> >> >different logics built within one method and it is hard to follow without
> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >and code alway be consistent) i.e. sometimes there will be no difference by
> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >atomic vs. non-atomic. Another question is, before we call
> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >anything here?
> >>
> >> Here's what I think about this issue, atomic CTAS wants to be the default
> >> behavior and only fall back to non-atomic CTAS if it's completely
> >> unattainable. Atomic CTAS will bring a better experience to users.
> >> Flink is already a stream batch unified engine, In our company kwai, many
> >> users are also using flink to do batch data processing, but still running
> >> in Stream mode.
> >> The boundary between stream and batch is gradually blurred, stream mode
> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> provides different atomicity implementations in Batch and Stream modes.
> >> Not only to determine if atomicity is supported, but also to help select
> >> different TwoPhaseCatalogTable implementations to provide different levels
> >> of atomicity!
> >>
> >> Looking forward to more feedback.
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >>
> >> Mang Zhang
> >>
> >>
> >>
> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >> >Hi Mang,
> >> >
> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >> >driving it. I have two questions and would like to know your thoughts,
> >> >thanks:
> >> >
> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >> >described in FLIP-218. Did I understand correctly?
> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >smell) we should commonly avoid in the public interface. According to the
> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >support atomic or not. With this selector argument, there will be two
> >> >different logics built within one method and it is hard to follow without
> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >and code alway be consistent) i.e. sometimes there will be no difference by
> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >atomic vs. non-atomic. Another question is, before we call
> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >anything here?
> >> >
> >> >Best regards,
> >> >Jing
> >> >
> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
> >> >
> >> >> Hi, Mang.
> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> >> in batch scenarios and integrate with the data lake which support
> >> >> transcation.
> >> >>
> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> >> context.
> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
> >> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> >> TwoPhaseCatalogTable which is different from normal case.
> >> >>
> >> >> How can the DynamiacTableSink can get it? Could you give some explanation
> >> >> or example in this FLIP?
> >> >>
> >> >>
> >> >> Best regards,
> >> >> Yuxia
> >> >>
> >> >> ----- 原始邮件 -----
> >> >> 发件人: "zhangmang1" <zh...@163.com>
> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> >> >> "lincoln 86xy" <li...@gmail.com>
> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> SELECT(CTAS) statement
> >> >>
> >> >> Hi, Lincoln and Ron
> >> >>
> >> >>
> >> >> Thank you for your reply.
> >> >> On the naming wise I think OK, the future expansion of new features more
> >> >> uniform. I have updated the FLIP.
> >> >>
> >> >>
> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> >> tables with speculative execution 3. writing Hive table with small file
> >> >> merge
> >> >>
> >> >>
> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
> >> >> the Flink framework,
> >> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> >> and we can subsequently split the sub-task to support the other two
> >> >> scenarios.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Best regards,
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >> >> >Hi, Mang
> >> >> >
> >> >> >+1 for completing the support for atomicity of CTAS, this is very useful
> >> >> in
> >> >> >batch scenarios.
> >> >> >
> >> >> >I have two questions:
> >> >> >1. naming wise:
> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >> >`TwoPhaseCatalogTable`?
> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
> >> >> >in the method name, which may remind users of the relevance of transaction
> >> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >> >`begin`
> >> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >> >catalogs?
> >> >> >
> >> >> >Best,
> >> >> >Lincoln Lee
> >> >> >
> >> >> >
> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >> >
> >> >> >> Hi, Mang
> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> >> FLIP
> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> >> mentioned
> >> >> >> three levels of atomicity semantics, can this current design do the
> >> >> same as
> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >> >>
> >> >> >> Best,
> >> >> >> Ron
> >> >> >>
> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >> >>
> >> >> >> > Hi, everyone
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> >> CREATE
> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> >> >> >> > atomic. It will create the table first before job running. If the job
> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> >> created
> >> >> >> > when the Job succeeds. Improve user experience.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > Looking forward to your feedback.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > [1]
> >> >> >> >
> >> >> >>
> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > --
> >> >> >> >
> >> >> >> > Best regards,
> >> >> >> > Mang Zhang
> >> >> >>
> >> >>
> >>
> >>
>
>

Re:Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Jing,
Yes, the atomic CTAS will be only supported for bounded data, but the execution modes can be stream or batch.
I introduced the isStreamingMode parameter in the twoPhaseCreateTable API to make it easier for users to provide different levels of atomicity implementation depending on the capabilities of the backend service.
For example, in the case of data synchronization, it is common to run the job using Stream mode, but also expect the data to be visible to the user only after the synchronization is complete.
flink cdc's synchronized data scenario, where the user must first write to a temporary table and then manually rename it to the final table; unfriendly to user experience.
Developers providing twoPhaseCreateTable capability in Catalog can decide whether to support atomicity based on the execution mode, or they can choose to provide lightweight atomicity support in Stream mode, such as automatically renaming the table name for the user.









--

Best regards,
Mang Zhang





At 2023-04-24 15:41:31, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>Hi Mang,
>
>
>
>Thanks for clarifying it. I am trying to understand your thoughts. Do you
>actually mean the boundedness[1] instead of the execution modes[2]? I.e.
>the atomic CTAS will be only supported for bounded data.
>
>
>
>Best regards,
>
>Jing
>
>
>
>[1] https://flink.apache.org/what-is-flink/flink-architecture/
>
>[2]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>
>On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zh...@163.com> wrote:
>
>> hi, Jing
>>
>> Thank you for your reply.
>>
>> >1. It looks like you found another way to design the atomic CTAS with new
>> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>> >described in FLIP-218. Did I understand correctly?
>> Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table.
>> Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions.
>>
>> >2. I am a little bit confused about the isStreamingMode parameter of
>> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >smell) we should commonly avoid in the public interface. According to the
>> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >support atomic or not. With this selector argument, there will be two
>> >different logics built within one method and it is hard to follow without
>> >reading the code or the doc carefully(another concern is to keep the doc
>> >and code alway be consistent) i.e. sometimes there will be no difference by
>> >using true/false isStreamingMode, sometimes they are quite different -
>> >atomic vs. non-atomic. Another question is, before we call
>> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >we could just follow FLIP-218 instead of (twistedly) calling
>> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >anything here?
>>
>> Here's what I think about this issue, atomic CTAS wants to be the default
>> behavior and only fall back to non-atomic CTAS if it's completely
>> unattainable. Atomic CTAS will bring a better experience to users.
>> Flink is already a stream batch unified engine, In our company kwai, many
>> users are also using flink to do batch data processing, but still running
>> in Stream mode.
>> The boundary between stream and batch is gradually blurred, stream mode
>> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> provides different atomicity implementations in Batch and Stream modes.
>> Not only to determine if atomicity is supported, but also to help select
>> different TwoPhaseCatalogTable implementations to provide different levels
>> of atomicity!
>>
>> Looking forward to more feedback.
>>
>>
>>
>> --
>>
>> Best regards,
>>
>> Mang Zhang
>>
>>
>>
>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >Hi Mang,
>> >
>> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >driving it. I have two questions and would like to know your thoughts,
>> >thanks:
>> >
>> >1. It looks like you found another way to design the atomic CTAS with new
>> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>> >described in FLIP-218. Did I understand correctly?
>> >2. I am a little bit confused about the isStreamingMode parameter of
>> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >smell) we should commonly avoid in the public interface. According to the
>> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >support atomic or not. With this selector argument, there will be two
>> >different logics built within one method and it is hard to follow without
>> >reading the code or the doc carefully(another concern is to keep the doc
>> >and code alway be consistent) i.e. sometimes there will be no difference by
>> >using true/false isStreamingMode, sometimes they are quite different -
>> >atomic vs. non-atomic. Another question is, before we call
>> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >we could just follow FLIP-218 instead of (twistedly) calling
>> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >anything here?
>> >
>> >Best regards,
>> >Jing
>> >
>> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>> >
>> >> Hi, Mang.
>> >> +1 for completing the support for atomicity of CTAS, this is very useful
>> >> in batch scenarios and integrate with the data lake which support
>> >> transcation.
>> >>
>> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> context.
>> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
>> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> TwoPhaseCatalogTable which is different from normal case.
>> >>
>> >> How can the DynamiacTableSink can get it? Could you give some explanation
>> >> or example in this FLIP?
>> >>
>> >>
>> >> Best regards,
>> >> Yuxia
>> >>
>> >> ----- 原始邮件 -----
>> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> >> "lincoln 86xy" <li...@gmail.com>
>> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> SELECT(CTAS) statement
>> >>
>> >> Hi, Lincoln and Ron
>> >>
>> >>
>> >> Thank you for your reply.
>> >> On the naming wise I think OK, the future expansion of new features more
>> >> uniform. I have updated the FLIP.
>> >>
>> >>
>> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
>> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> tables with speculative execution 3. writing Hive table with small file
>> >> merge
>> >>
>> >>
>> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
>> >> the Flink framework,
>> >> so I only poc to verify the first scenario of writing to the Hive table,
>> >> and we can subsequently split the sub-task to support the other two
>> >> scenarios.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >> Best regards,
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >> >Hi, Mang
>> >> >
>> >> >+1 for completing the support for atomicity of CTAS, this is very useful
>> >> in
>> >> >batch scenarios.
>> >> >
>> >> >I have two questions:
>> >> >1. naming wise:
>> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >`TwoPhaseCatalogTable`?
>> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>> >> >in the method name, which may remind users of the relevance of transaction
>> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >`begin`
>> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >catalogs?
>> >> >
>> >> >Best,
>> >> >Lincoln Lee
>> >> >
>> >> >
>> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >
>> >> >> Hi, Mang
>> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> >> FLIP
>> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> mentioned
>> >> >> three levels of atomicity semantics, can this current design do the
>> >> same as
>> >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >>
>> >> >> Best,
>> >> >> Ron
>> >> >>
>> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >>
>> >> >> > Hi, everyone
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> CREATE
>> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> >> >> > atomic. It will create the table first before job running. If the job
>> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> created
>> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > Looking forward to your feedback.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > [1]
>> >> >> >
>> >> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Mang Zhang
>> >> >>
>> >>
>>
>>

Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Mang,



Thanks for clarifying it. I am trying to understand your thoughts. Do you
actually mean the boundedness[1] instead of the execution modes[2]? I.e.
the atomic CTAS will be only supported for bounded data.



Best regards,

Jing



[1] https://flink.apache.org/what-is-flink/flink-architecture/

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Wed, Apr 19, 2023 at 9:14 AM Mang Zhang <zh...@163.com> wrote:

> hi, Jing
>
> Thank you for your reply.
>
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >described in FLIP-218. Did I understand correctly?
> Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table.
> Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions.
>
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
>
> Here's what I think about this issue, atomic CTAS wants to be the default
> behavior and only fall back to non-atomic CTAS if it's completely
> unattainable. Atomic CTAS will bring a better experience to users.
> Flink is already a stream batch unified engine, In our company kwai, many
> users are also using flink to do batch data processing, but still running
> in Stream mode.
> The boundary between stream and batch is gradually blurred, stream mode
> jobs may also FINISH, so I added the isStreamingMode parameter, this
> provides different atomicity implementations in Batch and Stream modes.
> Not only to determine if atomicity is supported, but also to help select
> different TwoPhaseCatalogTable implementations to provide different levels
> of atomicity!
>
> Looking forward to more feedback.
>
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >Hi Mang,
> >
> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >driving it. I have two questions and would like to know your thoughts,
> >thanks:
> >
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable as
> >described in FLIP-218. Did I understand correctly?
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
> >
> >Best regards,
> >Jing
> >
> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
> >
> >> Hi, Mang.
> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> in batch scenarios and integrate with the data lake which support
> >> transcation.
> >>
> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> context.
> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> TwoPhaseCatalogTable which is different from normal case.
> >>
> >> How can the DynamiacTableSink can get it? Could you give some explanation
> >> or example in this FLIP?
> >>
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> ----- 原始邮件 -----
> >> 发件人: "zhangmang1" <zh...@163.com>
> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> >> "lincoln 86xy" <li...@gmail.com>
> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> SELECT(CTAS) statement
> >>
> >> Hi, Lincoln and Ron
> >>
> >>
> >> Thank you for your reply.
> >> On the naming wise I think OK, the future expansion of new features more
> >> uniform. I have updated the FLIP.
> >>
> >>
> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> tables with speculative execution 3. writing Hive table with small file
> >> merge
> >>
> >>
> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
> >> the Flink framework,
> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> and we can subsequently split the sub-task to support the other two
> >> scenarios.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >>
> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >> >Hi, Mang
> >> >
> >> >+1 for completing the support for atomicity of CTAS, this is very useful
> >> in
> >> >batch scenarios.
> >> >
> >> >I have two questions:
> >> >1. naming wise:
> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >`TwoPhaseCatalogTable`?
> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
> >> >in the method name, which may remind users of the relevance of transaction
> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >`begin`
> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >catalogs?
> >> >
> >> >Best,
> >> >Lincoln Lee
> >> >
> >> >
> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >
> >> >> Hi, Mang
> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> FLIP
> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> mentioned
> >> >> three levels of atomicity semantics, can this current design do the
> >> same as
> >> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >>
> >> >> Best,
> >> >> Ron
> >> >>
> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >>
> >> >> > Hi, everyone
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> CREATE
> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> >> >> > atomic. It will create the table first before job running. If the job
> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> created
> >> >> > when the Job succeeds. Improve user experience.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > Looking forward to your feedback.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > [1]
> >> >> >
> >> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> >
> >> >> > Best regards,
> >> >> > Mang Zhang
> >> >>
> >>
>
>

Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by liu ron <ro...@gmail.com>.
Hi, Mang

Thanks for your update, the FLIP looks good to me now.

Best,
Ron

Mang Zhang <zh...@163.com> 于2023年6月9日周五 12:08写道:

> Hi Ron,
> Thanks for your reply!
> After our offline discussion, at present, there may be many of flink jobs
> using non-atomic CTAS functions, especially Stream jobs,
> If we only infer whether atomic CTAS is supported based on whether
> DynamicTableSink implements the SupportsStaging interface,
> then after the user upgrades to a new version, flink's behavior will
> change, which is not production friendly.
> in order to ensure the consistency of flink behavior, and to give the user
> maximum flexibility,
> in time DynamicTableSink implements the SupportsStaging interface, users
> can still choose non-atomic implementation according to business needs.
>
> I have updated FLIP-305[1].
>
> Looking forward to more feedback, if there is no other feedback, I will
> launch a vote next Monday(2023-06-12).
> Thanks again!
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-06-09 10:23:21, "liu ron" <ro...@gmail.com> wrote:
> >Hi, Mang
> >
> >In FLIP-214, we have discussed that atomicity is not needed in streaming
> >mode, so we have implemented the initial version that doesn't support
> >atomicity. In addition, we introduce the option
> >"table.ctas.atomicity-enabled" to enable the atomic ability. According to
> >your FLIP-315 description, Once the DynamicTableSink implements the
> >SupportsStaging interface, the atomicity is the default behavior whether in
> >stream mode or batch mode, and the user can't change it, I think this is
> >not feasible for streaming mode, the atomicity should can be controlled by
> >user. So I think we should clear the atomicity behavior combine the option
> >and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
> >the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?
> >
> >Best,
> >Ron
> >
> >Jark Wu <im...@gmail.com> 于2023年6月8日周四 16:30写道:
> >
> >> Thank you for the great work, Mang! The updated proposal looks good to me.
> >>
> >> Best,
> >> Jark
> >>
> >> > 2023年6月8日 11:49,Jingsong Li <ji...@gmail.com> 写道:
> >> >
> >> > Thanks Mang for updating!
> >> >
> >> > Looks good to me!
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zh...@163.com> wrote:
> >> >>
> >> >> Hi Jingsong,
> >> >>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >> On this issue, we introduce the executable logic commit/abort a bit of
> >> strange on CatalogTable.
> >> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
> >> scenario.
> >> >> The new solution is similar to the implementation of SupportsOverwrite,
> >> >> which introduces the SupportsStaging interface and infers whether
> >> DynamicTableSink supports atomic ctas based on whether it implements the
> >> SupportsStaging interface,
> >> >> and if so, it will get the StagedTable object from DynamicTableSink.
> >> >>
> >> >> For more implementation details, please see the FLIP-305 document.
> >> >>
> >> >> This is my poc commits
> >> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
> >> >>
> >> >>
> >> >> [1]
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Best regards,
> >> >>
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >> At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
> >> >>> Hi Mang,
> >> >>>
> >> >>> Thanks for starting this FLIP.
> >> >>>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >>>
> >> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >> >>>
> >> >>> Best,
> >> >>> Jingsong
> >> >>>
> >> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
> >> >>>>
> >> >>>> Hi Ron,
> >> >>>>
> >> >>>>
> >> >>>> First of all, thank you for your reply!
> >> >>>> After our offline communication, what you said is mainly in the
> >> compilePlan scenario, but currently compilePlanSql does not support non
> >> INSERT statements, otherwise it will throw an exception.
> >> >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL
> >> statement of type INSERT
> >> >>>> But it's a good point that I will seriously consider.
> >> >>>> Non-atomic CTAS can be supported relatively easily;
> >> >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it
> >> as is and follow up with a separate issue to implement CTAS support for
> >> compilePlanSql.
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> --
> >> >>>>
> >> >>>> Best regards,
> >> >>>> Mang Zhang
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
> >> >>>>> Hi, Mang
> >> >>>>>
> >> >>>>> I have a question about the implementation details. For the
> >> atomicity case,
> >> >>>>> since the target table is not created before the JobGraph is
> >> generated, but
> >> >>>>> then the target table is required to exist when optimizing plan to
> >> generate
> >> >>>>> the JobGraph. So how do you solve this problem?
> >> >>>>>
> >> >>>>> Best,
> >> >>>>> Ron
> >> >>>>>
> >> >>>>> yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >> >>>>>
> >> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed
> >> after
> >> >>>>>> offline discussion with Mang.
> >> >>>>>> The main or important reason is that the TwoPhaseCatalogTable
> >> enables
> >> >>>>>> external connectors to implement theirs own logic for commit /
> >> abort.
> >> >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the
> >> table
> >> >>>>>> when the job fail. It's not ideal for it's too generic to work well.
> >> >>>>>> For example, some connectors will need to clean some temporary
> >> files in
> >> >>>>>> abort method. And the actual connector can know the specific logic
> >> for
> >> >>>>>> aborting.
> >> >>>>>>
> >> >>>>>> Best regards,
> >> >>>>>> Yuxia
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> 发件人: "zhangmang1" <zh...@163.com>
> >> >>>>>> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
> >> >>>>>> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
> >> >>>>>> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
> >> >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >> >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >>>>>> SELECT(CTAS) statement
> >> >>>>>>
> >> >>>>>> hi, Jing
> >> >>>>>> Thank you for your reply.
> >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> >> with new
> >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> >> serializable
> >> >>>>>> as
> >> >>>>>>> described in FLIP-218. Did I understand correctly?
> >> >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered
> >> problems
> >> >>>>>> with Catalog/CatalogTable serialization deserialization, for
> >> example, after
> >> >>>>>> deserialization CatalogTable could not be converted to Hive Table.
> >> Also,
> >> >>>>>> Catalog serialization is still a heavy operation, but it may not
> >> actually
> >> >>>>>> be necessary, we just need Create Table.
> >> >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >> >>>>>> facilitates the implementation of the subsequent data lake,
> >> ReplaceTable
> >> >>>>>> and other functions.
> >> >>>>>>
> >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> >> of
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> >> argument(code
> >> >>>>>>> smell) we should commonly avoid in the public interface. According
> >> to the
> >> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> >> whether to
> >> >>>>>>> support atomic or not. With this selector argument, there will be
> >> two
> >> >>>>>>> different logics built within one method and it is hard to follow
> >> without
> >> >>>>>>> reading the code or the doc carefully(another concern is to keep
> >> the doc
> >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> >> difference
> >> >>>>>> by
> >> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> >> different -
> >> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> >> streaming mode,
> >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> >> I miss
> >> >>>>>>> anything here?
> >> >>>>>> Here's what I think about this issue, atomic CTAS wants to be the
> >> default
> >> >>>>>> behavior and only fall back to non-atomic CTAS if it's completely
> >> >>>>>> unattainable. Atomic CTAS will bring a better experience to users.
> >> >>>>>> Flink is already a stream batch unified engine, In our company
> >> kwai, many
> >> >>>>>> users are also using flink to do batch data processing, but still
> >> running
> >> >>>>>> in Stream mode.
> >> >>>>>> The boundary between stream and batch is gradually blurred, stream
> >> mode
> >> >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> >>>>>> provides different atomicity implementations in Batch and Stream
> >> modes.
> >> >>>>>> Not only to determine if atomicity is supported, but also to help
> >> select
> >> >>>>>> different TwoPhaseCatalogTable implementations to provide different
> >> levels
> >> >>>>>> of atomicity!
> >> >>>>>>
> >> >>>>>> Looking forward to more feedback.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> Best regards,
> >> >>>>>> Mang Zhang
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID>
> >> wrote:
> >> >>>>>>> Hi Mang,
> >> >>>>>>>
> >> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks
> >> for
> >> >>>>>>> driving it. I have two questions and would like to know your
> >> thoughts,
> >> >>>>>>> thanks:
> >> >>>>>>>
> >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> >> with new
> >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> >> serializable
> >> >>>>>> as
> >> >>>>>>> described in FLIP-218. Did I understand correctly?
> >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> >> of
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> >> argument(code
> >> >>>>>>> smell) we should commonly avoid in the public interface. According
> >> to the
> >> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> >> whether to
> >> >>>>>>> support atomic or not. With this selector argument, there will be
> >> two
> >> >>>>>>> different logics built within one method and it is hard to follow
> >> without
> >> >>>>>>> reading the code or the doc carefully(another concern is to keep
> >> the doc
> >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> >> difference
> >> >>>>>> by
> >> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> >> different -
> >> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> >> streaming mode,
> >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> >> I miss
> >> >>>>>>> anything here?
> >> >>>>>>>
> >> >>>>>>> Best regards,
> >> >>>>>>> Jing
> >> >>>>>>>
> >> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyuxia@alumni.sjtu.edu.cn
> >> >
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Hi, Mang.
> >> >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> >> useful
> >> >>>>>>>> in batch scenarios and integrate with the data lake which support
> >> >>>>>>>> transcation.
> >> >>>>>>>>
> >> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need
> >> to know
> >> >>>>>>>> it's for normal case or the atomicity with CTAS as well as
> >> neccessary
> >> >>>>>>>> context.
> >> >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity
> >> supports,
> >> >>>>>> the
> >> >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
> >> >>>>>>>> TwoPhaseCatalogTable which is different from normal case.
> >> >>>>>>>>
> >> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some
> >> >>>>>> explanation
> >> >>>>>>>> or example in this FLIP?
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Best regards,
> >> >>>>>>>> Yuxia
> >> >>>>>>>>
> >> >>>>>>>> ----- 原始邮件 -----
> >> >>>>>>>> 发件人: "zhangmang1" <zh...@163.com>
> >> >>>>>>>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ron9.liu@gmail.com
> >> >,
> >> >>>>>>>> "lincoln 86xy" <li...@gmail.com>
> >> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >>>>>>>> SELECT(CTAS) statement
> >> >>>>>>>>
> >> >>>>>>>> Hi, Lincoln and Ron
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Thank you for your reply.
> >> >>>>>>>> On the naming wise I think OK, the future expansion of new
> >> features more
> >> >>>>>>>> uniform. I have updated the FLIP.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage
> >> scenarios and
> >> >>>>>> can
> >> >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2.
> >> writing Hive
> >> >>>>>>>> tables with speculative execution 3. writing Hive table with
> >> small file
> >> >>>>>>>> merge
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS
> >> atomicity
> >> >>>>>> in
> >> >>>>>>>> the Flink framework,
> >> >>>>>>>> so I only poc to verify the first scenario of writing to the Hive
> >> table,
> >> >>>>>>>> and we can subsequently split the sub-task to support the other
> >> two
> >> >>>>>>>> scenarios.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>>
> >> >>>>>>>> Best regards,
> >> >>>>>>>> Mang Zhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com>
> >> wrote:
> >> >>>>>>>>> Hi, Mang
> >> >>>>>>>>>
> >> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> >> >>>>>> useful
> >> >>>>>>>> in
> >> >>>>>>>>> batch scenarios.
> >> >>>>>>>>>
> >> >>>>>>>>> I have two questions:
> >> >>>>>>>>> 1. naming wise:
> >> >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
> >> >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >>>>>>>>> `TwoPhaseCatalogTable`?
> >> >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >> >>>>>> 'transaction'
> >> >>>>>>>>> in the method name, which may remind users of the relevance of
> >> >>>>>> transaction
> >> >>>>>>>>> support (however, it is not strictly so), so I suggest changing
> >> it to
> >> >>>>>>>>> `begin`
> >> >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or
> >> other
> >> >>>>>>>>> catalogs?
> >> >>>>>>>>>
> >> >>>>>>>>> Best,
> >> >>>>>>>>> Lincoln Lee
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi, Mang
> >> >>>>>>>>>> Atomicity is very important for CTAS, especially for batch
> >> jobs. This
> >> >>>>>>>> FLIP
> >> >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
> >> >>>>>>>> mentioned
> >> >>>>>>>>>> three levels of atomicity semantics, can this current design do
> >> the
> >> >>>>>>>> same as
> >> >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
> >> >>>>>> isolation,
> >> >>>>>>>>>> for example, can it be done by writing to Hive tables using
> >> CTAS?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Ron
> >> >>>>>>>>>>
> >> >>>>>>>>>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Hi, everyone
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic
> >> for
> >> >>>>>>>> CREATE
> >> >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but
> >> it's
> >> >>>>>> not
> >> >>>>>>>>>>> atomic. It will create the table first before job running. If
> >> the
> >> >>>>>> job
> >> >>>>>>>>>>> execution fails, or is cancelled, the table will not be
> >> dropped.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
> >> >>>>>>>> created
> >> >>>>>>>>>>> when the Job succeeds. Improve user experience.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Looking forward to your feedback.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> [1]
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> --
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best regards,
> >> >>>>>>>>>>> Mang Zhang
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>
> >> >>>>>>
> >>
> >>
>
>

Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Ron,
Thanks for your reply!
After our offline discussion, at present, there may be many of flink jobs using non-atomic CTAS functions, especially Stream jobs, 
If we only infer whether atomic CTAS is supported based on whether DynamicTableSink implements the SupportsStaging interface,
then after the user upgrades to a new version, flink's behavior will change, which is not production friendly.
in order to ensure the consistency of flink behavior, and to give the user maximum flexibility, 
in time DynamicTableSink implements the SupportsStaging interface, users can still choose non-atomic implementation according to business needs.


I have updated FLIP-305[1].


Looking forward to more feedback, if there is no other feedback, I will launch a vote next Monday(2023-06-12).
Thanks again!






[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement




--

Best regards,
Mang Zhang





At 2023-06-09 10:23:21, "liu ron" <ro...@gmail.com> wrote:
>Hi, Mang
>
>In FLIP-214, we have discussed that atomicity is not needed in streaming
>mode, so we have implemented the initial version that doesn't support
>atomicity. In addition, we introduce the option
>"table.ctas.atomicity-enabled" to enable the atomic ability. According to
>your FLIP-315 description, Once the DynamicTableSink implements the
>SupportsStaging interface, the atomicity is the default behavior whether in
>stream mode or batch mode, and the user can't change it, I think this is
>not feasible for streaming mode, the atomicity should can be controlled by
>user. So I think we should clear the atomicity behavior combine the option
>and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
>the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?
>
>Best,
>Ron
>
>Jark Wu <im...@gmail.com> 于2023年6月8日周四 16:30写道:
>
>> Thank you for the great work, Mang! The updated proposal looks good to me.
>>
>> Best,
>> Jark
>>
>> > 2023年6月8日 11:49,Jingsong Li <ji...@gmail.com> 写道:
>> >
>> > Thanks Mang for updating!
>> >
>> > Looks good to me!
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zh...@163.com> wrote:
>> >>
>> >> Hi Jingsong,
>> >>
>> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>> >>> Flink design places execution in the TableFactory or directly in the
>> >>> Catalog, so introducing an executable table makes me feel a bit
>> >>> strange. (Spark is this style, but Flink may not be)
>> >> On this issue, we introduce the executable logic commit/abort a bit of
>> strange on CatalogTable.
>> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
>> scenario.
>> >> The new solution is similar to the implementation of SupportsOverwrite,
>> >> which introduces the SupportsStaging interface and infers whether
>> DynamicTableSink supports atomic ctas based on whether it implements the
>> SupportsStaging interface,
>> >> and if so, it will get the StagedTable object from DynamicTableSink.
>> >>
>> >> For more implementation details, please see the FLIP-305 document.
>> >>
>> >> This is my poc commits
>> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>> >>
>> >>
>> >> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >>
>> >>
>> >> --
>> >>
>> >> Best regards,
>> >>
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >> At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
>> >>> Hi Mang,
>> >>>
>> >>> Thanks for starting this FLIP.
>> >>>
>> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>> >>> Flink design places execution in the TableFactory or directly in the
>> >>> Catalog, so introducing an executable table makes me feel a bit
>> >>> strange. (Spark is this style, but Flink may not be)
>> >>>
>> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>> >>>
>> >>> Best,
>> >>> Jingsong
>> >>>
>> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
>> >>>>
>> >>>> Hi Ron,
>> >>>>
>> >>>>
>> >>>> First of all, thank you for your reply!
>> >>>> After our offline communication, what you said is mainly in the
>> compilePlan scenario, but currently compilePlanSql does not support non
>> INSERT statements, otherwise it will throw an exception.
>> >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL
>> statement of type INSERT
>> >>>> But it's a good point that I will seriously consider.
>> >>>> Non-atomic CTAS can be supported relatively easily;
>> >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it
>> as is and follow up with a separate issue to implement CTAS support for
>> compilePlanSql.
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>>
>> >>>> Best regards,
>> >>>> Mang Zhang
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
>> >>>>> Hi, Mang
>> >>>>>
>> >>>>> I have a question about the implementation details. For the
>> atomicity case,
>> >>>>> since the target table is not created before the JobGraph is
>> generated, but
>> >>>>> then the target table is required to exist when optimizing plan to
>> generate
>> >>>>> the JobGraph. So how do you solve this problem?
>> >>>>>
>> >>>>> Best,
>> >>>>> Ron
>> >>>>>
>> >>>>> yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>> >>>>>
>> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed
>> after
>> >>>>>> offline discussion with Mang.
>> >>>>>> The main or important reason is that the TwoPhaseCatalogTable
>> enables
>> >>>>>> external connectors to implement theirs own logic for commit /
>> abort.
>> >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the
>> table
>> >>>>>> when the job fail. It's not ideal for it's too generic to work well.
>> >>>>>> For example, some connectors will need to clean some temporary
>> files in
>> >>>>>> abort method. And the actual connector can know the specific logic
>> for
>> >>>>>> aborting.
>> >>>>>>
>> >>>>>> Best regards,
>> >>>>>> Yuxia
>> >>>>>>
>> >>>>>>
>> >>>>>> 发件人: "zhangmang1" <zh...@163.com>
>> >>>>>> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
>> >>>>>> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
>> >>>>>> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
>> >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >>>>>> SELECT(CTAS) statement
>> >>>>>>
>> >>>>>> hi, Jing
>> >>>>>> Thank you for your reply.
>> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
>> with new
>> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
>> serializable
>> >>>>>> as
>> >>>>>>> described in FLIP-218. Did I understand correctly?
>> >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered
>> problems
>> >>>>>> with Catalog/CatalogTable serialization deserialization, for
>> example, after
>> >>>>>> deserialization CatalogTable could not be converted to Hive Table.
>> Also,
>> >>>>>> Catalog serialization is still a heavy operation, but it may not
>> actually
>> >>>>>> be necessary, we just need Create Table.
>> >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> >>>>>> facilitates the implementation of the subsequent data lake,
>> ReplaceTable
>> >>>>>> and other functions.
>> >>>>>>
>> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
>> of
>> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
>> argument(code
>> >>>>>>> smell) we should commonly avoid in the public interface. According
>> to the
>> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
>> whether to
>> >>>>>>> support atomic or not. With this selector argument, there will be
>> two
>> >>>>>>> different logics built within one method and it is hard to follow
>> without
>> >>>>>>> reading the code or the doc carefully(another concern is to keep
>> the doc
>> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
>> difference
>> >>>>>> by
>> >>>>>>> using true/false isStreamingMode, sometimes they are quite
>> different -
>> >>>>>>> atomic vs. non-atomic. Another question is, before we call
>> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >>>>>>> isStreamingMode. In case only non-atomic is supported for
>> streaming mode,
>> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
>> I miss
>> >>>>>>> anything here?
>> >>>>>> Here's what I think about this issue, atomic CTAS wants to be the
>> default
>> >>>>>> behavior and only fall back to non-atomic CTAS if it's completely
>> >>>>>> unattainable. Atomic CTAS will bring a better experience to users.
>> >>>>>> Flink is already a stream batch unified engine, In our company
>> kwai, many
>> >>>>>> users are also using flink to do batch data processing, but still
>> running
>> >>>>>> in Stream mode.
>> >>>>>> The boundary between stream and batch is gradually blurred, stream
>> mode
>> >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> >>>>>> provides different atomicity implementations in Batch and Stream
>> modes.
>> >>>>>> Not only to determine if atomicity is supported, but also to help
>> select
>> >>>>>> different TwoPhaseCatalogTable implementations to provide different
>> levels
>> >>>>>> of atomicity!
>> >>>>>>
>> >>>>>> Looking forward to more feedback.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> Best regards,
>> >>>>>> Mang Zhang
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID>
>> wrote:
>> >>>>>>> Hi Mang,
>> >>>>>>>
>> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks
>> for
>> >>>>>>> driving it. I have two questions and would like to know your
>> thoughts,
>> >>>>>>> thanks:
>> >>>>>>>
>> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
>> with new
>> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
>> serializable
>> >>>>>> as
>> >>>>>>> described in FLIP-218. Did I understand correctly?
>> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
>> of
>> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
>> argument(code
>> >>>>>>> smell) we should commonly avoid in the public interface. According
>> to the
>> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
>> whether to
>> >>>>>>> support atomic or not. With this selector argument, there will be
>> two
>> >>>>>>> different logics built within one method and it is hard to follow
>> without
>> >>>>>>> reading the code or the doc carefully(another concern is to keep
>> the doc
>> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
>> difference
>> >>>>>> by
>> >>>>>>> using true/false isStreamingMode, sometimes they are quite
>> different -
>> >>>>>>> atomic vs. non-atomic. Another question is, before we call
>> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >>>>>>> isStreamingMode. In case only non-atomic is supported for
>> streaming mode,
>> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
>> I miss
>> >>>>>>> anything here?
>> >>>>>>>
>> >>>>>>> Best regards,
>> >>>>>>> Jing
>> >>>>>>>
>> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyuxia@alumni.sjtu.edu.cn
>> >
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi, Mang.
>> >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
>> useful
>> >>>>>>>> in batch scenarios and integrate with the data lake which support
>> >>>>>>>> transcation.
>> >>>>>>>>
>> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need
>> to know
>> >>>>>>>> it's for normal case or the atomicity with CTAS as well as
>> neccessary
>> >>>>>>>> context.
>> >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity
>> supports,
>> >>>>>> the
>> >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
>> >>>>>>>> TwoPhaseCatalogTable which is different from normal case.
>> >>>>>>>>
>> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some
>> >>>>>> explanation
>> >>>>>>>> or example in this FLIP?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Best regards,
>> >>>>>>>> Yuxia
>> >>>>>>>>
>> >>>>>>>> ----- 原始邮件 -----
>> >>>>>>>> 发件人: "zhangmang1" <zh...@163.com>
>> >>>>>>>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ron9.liu@gmail.com
>> >,
>> >>>>>>>> "lincoln 86xy" <li...@gmail.com>
>> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >>>>>>>> SELECT(CTAS) statement
>> >>>>>>>>
>> >>>>>>>> Hi, Lincoln and Ron
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Thank you for your reply.
>> >>>>>>>> On the naming wise I think OK, the future expansion of new
>> features more
>> >>>>>>>> uniform. I have updated the FLIP.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage
>> scenarios and
>> >>>>>> can
>> >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2.
>> writing Hive
>> >>>>>>>> tables with speculative execution 3. writing Hive table with
>> small file
>> >>>>>>>> merge
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS
>> atomicity
>> >>>>>> in
>> >>>>>>>> the Flink framework,
>> >>>>>>>> so I only poc to verify the first scenario of writing to the Hive
>> table,
>> >>>>>>>> and we can subsequently split the sub-task to support the other
>> two
>> >>>>>>>> scenarios.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>>
>> >>>>>>>> Best regards,
>> >>>>>>>> Mang Zhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com>
>> wrote:
>> >>>>>>>>> Hi, Mang
>> >>>>>>>>>
>> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
>> >>>>>> useful
>> >>>>>>>> in
>> >>>>>>>>> batch scenarios.
>> >>>>>>>>>
>> >>>>>>>>> I have two questions:
>> >>>>>>>>> 1. naming wise:
>> >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
>> >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >>>>>>>>> `TwoPhaseCatalogTable`?
>> >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>> >>>>>> 'transaction'
>> >>>>>>>>> in the method name, which may remind users of the relevance of
>> >>>>>> transaction
>> >>>>>>>>> support (however, it is not strictly so), so I suggest changing
>> it to
>> >>>>>>>>> `begin`
>> >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or
>> other
>> >>>>>>>>> catalogs?
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Lincoln Lee
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >>>>>>>>>
>> >>>>>>>>>> Hi, Mang
>> >>>>>>>>>> Atomicity is very important for CTAS, especially for batch
>> jobs. This
>> >>>>>>>> FLIP
>> >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
>> >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
>> >>>>>>>> mentioned
>> >>>>>>>>>> three levels of atomicity semantics, can this current design do
>> the
>> >>>>>>>> same as
>> >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
>> >>>>>> isolation,
>> >>>>>>>>>> for example, can it be done by writing to Hive tables using
>> CTAS?
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Ron
>> >>>>>>>>>>
>> >>>>>>>>>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi, everyone
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic
>> for
>> >>>>>>>> CREATE
>> >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but
>> it's
>> >>>>>> not
>> >>>>>>>>>>> atomic. It will create the table first before job running. If
>> the
>> >>>>>> job
>> >>>>>>>>>>> execution fails, or is cancelled, the table will not be
>> dropped.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
>> >>>>>>>> created
>> >>>>>>>>>>> when the Job succeeds. Improve user experience.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Looking forward to your feedback.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> [1]
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> --
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best regards,
>> >>>>>>>>>>> Mang Zhang
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>>
>>
>>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by liu ron <ro...@gmail.com>.
Hi, Mang

In FLIP-214, we have discussed that atomicity is not needed in streaming
mode, so we have implemented the initial version that doesn't support
atomicity. In addition, we introduce the option
"table.ctas.atomicity-enabled" to enable the atomic ability. According to
your FLIP-315 description, Once the DynamicTableSink implements the
SupportsStaging interface, the atomicity is the default behavior whether in
stream mode or batch mode, and the user can't change it, I think this is
not feasible for streaming mode, the atomicity should can be controlled by
user. So I think we should clear the atomicity behavior combine the option
and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?

Best,
Ron

Jark Wu <im...@gmail.com> 于2023年6月8日周四 16:30写道:

> Thank you for the great work, Mang! The updated proposal looks good to me.
>
> Best,
> Jark
>
> > 2023年6月8日 11:49,Jingsong Li <ji...@gmail.com> 写道:
> >
> > Thanks Mang for updating!
> >
> > Looks good to me!
> >
> > Best,
> > Jingsong
> >
> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zh...@163.com> wrote:
> >>
> >> Hi Jingsong,
> >>
> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >>> Flink design places execution in the TableFactory or directly in the
> >>> Catalog, so introducing an executable table makes me feel a bit
> >>> strange. (Spark is this style, but Flink may not be)
> >> On this issue, we introduce the executable logic commit/abort a bit of
> strange on CatalogTable.
> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
> scenario.
> >> The new solution is similar to the implementation of SupportsOverwrite,
> >> which introduces the SupportsStaging interface and infers whether
> DynamicTableSink supports atomic ctas based on whether it implements the
> SupportsStaging interface,
> >> and if so, it will get the StagedTable object from DynamicTableSink.
> >>
> >> For more implementation details, please see the FLIP-305 document.
> >>
> >> This is my poc commits
> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
> >>
> >>
> >> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >>
> >>
> >> --
> >>
> >> Best regards,
> >>
> >> Mang Zhang
> >>
> >>
> >>
> >> At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
> >>> Hi Mang,
> >>>
> >>> Thanks for starting this FLIP.
> >>>
> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >>> Flink design places execution in the TableFactory or directly in the
> >>> Catalog, so introducing an executable table makes me feel a bit
> >>> strange. (Spark is this style, but Flink may not be)
> >>>
> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
> >>>>
> >>>> Hi Ron,
> >>>>
> >>>>
> >>>> First of all, thank you for your reply!
> >>>> After our offline communication, what you said is mainly in the
> compilePlan scenario, but currently compilePlanSql does not support non
> INSERT statements, otherwise it will throw an exception.
> >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL
> statement of type INSERT
> >>>> But it's a good point that I will seriously consider.
> >>>> Non-atomic CTAS can be supported relatively easily;
> >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it
> as is and follow up with a separate issue to implement CTAS support for
> compilePlanSql.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>> Best regards,
> >>>> Mang Zhang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
> >>>>> Hi, Mang
> >>>>>
> >>>>> I have a question about the implementation details. For the
> atomicity case,
> >>>>> since the target table is not created before the JobGraph is
> generated, but
> >>>>> then the target table is required to exist when optimizing plan to
> generate
> >>>>> the JobGraph. So how do you solve this problem?
> >>>>>
> >>>>> Best,
> >>>>> Ron
> >>>>>
> >>>>> yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >>>>>
> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed
> after
> >>>>>> offline discussion with Mang.
> >>>>>> The main or important reason is that the TwoPhaseCatalogTable
> enables
> >>>>>> external connectors to implement theirs own logic for commit /
> abort.
> >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the
> table
> >>>>>> when the job fail. It's not ideal for it's too generic to work well.
> >>>>>> For example, some connectors will need to clean some temporary
> files in
> >>>>>> abort method. And the actual connector can know the specific logic
> for
> >>>>>> aborting.
> >>>>>>
> >>>>>> Best regards,
> >>>>>> Yuxia
> >>>>>>
> >>>>>>
> >>>>>> 发件人: "zhangmang1" <zh...@163.com>
> >>>>>> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
> >>>>>> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
> >>>>>> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
> >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >>>>>> SELECT(CTAS) statement
> >>>>>>
> >>>>>> hi, Jing
> >>>>>> Thank you for your reply.
> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> with new
> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> serializable
> >>>>>> as
> >>>>>>> described in FLIP-218. Did I understand correctly?
> >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered
> problems
> >>>>>> with Catalog/CatalogTable serialization deserialization, for
> example, after
> >>>>>> deserialization CatalogTable could not be converted to Hive Table.
> Also,
> >>>>>> Catalog serialization is still a heavy operation, but it may not
> actually
> >>>>>> be necessary, we just need Create Table.
> >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >>>>>> facilitates the implementation of the subsequent data lake,
> ReplaceTable
> >>>>>> and other functions.
> >>>>>>
> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> of
> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> argument(code
> >>>>>>> smell) we should commonly avoid in the public interface. According
> to the
> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> whether to
> >>>>>>> support atomic or not. With this selector argument, there will be
> two
> >>>>>>> different logics built within one method and it is hard to follow
> without
> >>>>>>> reading the code or the doc carefully(another concern is to keep
> the doc
> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> difference
> >>>>>> by
> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> different -
> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> streaming mode,
> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> I miss
> >>>>>>> anything here?
> >>>>>> Here's what I think about this issue, atomic CTAS wants to be the
> default
> >>>>>> behavior and only fall back to non-atomic CTAS if it's completely
> >>>>>> unattainable. Atomic CTAS will bring a better experience to users.
> >>>>>> Flink is already a stream batch unified engine, In our company
> kwai, many
> >>>>>> users are also using flink to do batch data processing, but still
> running
> >>>>>> in Stream mode.
> >>>>>> The boundary between stream and batch is gradually blurred, stream
> mode
> >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >>>>>> provides different atomicity implementations in Batch and Stream
> modes.
> >>>>>> Not only to determine if atomicity is supported, but also to help
> select
> >>>>>> different TwoPhaseCatalogTable implementations to provide different
> levels
> >>>>>> of atomicity!
> >>>>>>
> >>>>>> Looking forward to more feedback.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best regards,
> >>>>>> Mang Zhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID>
> wrote:
> >>>>>>> Hi Mang,
> >>>>>>>
> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks
> for
> >>>>>>> driving it. I have two questions and would like to know your
> thoughts,
> >>>>>>> thanks:
> >>>>>>>
> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> with new
> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> serializable
> >>>>>> as
> >>>>>>> described in FLIP-218. Did I understand correctly?
> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> of
> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> argument(code
> >>>>>>> smell) we should commonly avoid in the public interface. According
> to the
> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> whether to
> >>>>>>> support atomic or not. With this selector argument, there will be
> two
> >>>>>>> different logics built within one method and it is hard to follow
> without
> >>>>>>> reading the code or the doc carefully(another concern is to keep
> the doc
> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> difference
> >>>>>> by
> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> different -
> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> streaming mode,
> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> I miss
> >>>>>>> anything here?
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>> Jing
> >>>>>>>
> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyuxia@alumni.sjtu.edu.cn
> >
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi, Mang.
> >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> useful
> >>>>>>>> in batch scenarios and integrate with the data lake which support
> >>>>>>>> transcation.
> >>>>>>>>
> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need
> to know
> >>>>>>>> it's for normal case or the atomicity with CTAS as well as
> neccessary
> >>>>>>>> context.
> >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity
> supports,
> >>>>>> the
> >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
> >>>>>>>> TwoPhaseCatalogTable which is different from normal case.
> >>>>>>>>
> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some
> >>>>>> explanation
> >>>>>>>> or example in this FLIP?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Yuxia
> >>>>>>>>
> >>>>>>>> ----- 原始邮件 -----
> >>>>>>>> 发件人: "zhangmang1" <zh...@163.com>
> >>>>>>>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ron9.liu@gmail.com
> >,
> >>>>>>>> "lincoln 86xy" <li...@gmail.com>
> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >>>>>>>> SELECT(CTAS) statement
> >>>>>>>>
> >>>>>>>> Hi, Lincoln and Ron
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thank you for your reply.
> >>>>>>>> On the naming wise I think OK, the future expansion of new
> features more
> >>>>>>>> uniform. I have updated the FLIP.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage
> scenarios and
> >>>>>> can
> >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2.
> writing Hive
> >>>>>>>> tables with speculative execution 3. writing Hive table with
> small file
> >>>>>>>> merge
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS
> atomicity
> >>>>>> in
> >>>>>>>> the Flink framework,
> >>>>>>>> so I only poc to verify the first scenario of writing to the Hive
> table,
> >>>>>>>> and we can subsequently split the sub-task to support the other
> two
> >>>>>>>> scenarios.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Mang Zhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com>
> wrote:
> >>>>>>>>> Hi, Mang
> >>>>>>>>>
> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> >>>>>> useful
> >>>>>>>> in
> >>>>>>>>> batch scenarios.
> >>>>>>>>>
> >>>>>>>>> I have two questions:
> >>>>>>>>> 1. naming wise:
> >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
> >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >>>>>>>>> `TwoPhaseCatalogTable`?
> >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >>>>>> 'transaction'
> >>>>>>>>> in the method name, which may remind users of the relevance of
> >>>>>> transaction
> >>>>>>>>> support (however, it is not strictly so), so I suggest changing
> it to
> >>>>>>>>> `begin`
> >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or
> other
> >>>>>>>>> catalogs?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Lincoln Lee
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >>>>>>>>>
> >>>>>>>>>> Hi, Mang
> >>>>>>>>>> Atomicity is very important for CTAS, especially for batch
> jobs. This
> >>>>>>>> FLIP
> >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
> >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
> >>>>>>>> mentioned
> >>>>>>>>>> three levels of atomicity semantics, can this current design do
> the
> >>>>>>>> same as
> >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
> >>>>>> isolation,
> >>>>>>>>>> for example, can it be done by writing to Hive tables using
> CTAS?
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Ron
> >>>>>>>>>>
> >>>>>>>>>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi, everyone
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic
> for
> >>>>>>>> CREATE
> >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but
> it's
> >>>>>> not
> >>>>>>>>>>> atomic. It will create the table first before job running. If
> the
> >>>>>> job
> >>>>>>>>>>> execution fails, or is cancelled, the table will not be
> dropped.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
> >>>>>>>> created
> >>>>>>>>>>> when the Job succeeds. Improve user experience.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Mang Zhang
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
>
>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jark Wu <im...@gmail.com>.
Thank you for the great work, Mang! The updated proposal looks good to me. 

Best,
Jark

> 2023年6月8日 11:49,Jingsong Li <ji...@gmail.com> 写道:
> 
> Thanks Mang for updating!
> 
> Looks good to me!
> 
> Best,
> Jingsong
> 
> On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zh...@163.com> wrote:
>> 
>> Hi Jingsong,
>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>> On this issue, we introduce the executable logic commit/abort a bit of strange on CatalogTable.
>> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
>> The new solution is similar to the implementation of SupportsOverwrite,
>> which introduces the SupportsStaging interface and infers whether DynamicTableSink supports atomic ctas based on whether it implements the SupportsStaging interface,
>> and if so, it will get the StagedTable object from DynamicTableSink.
>> 
>> For more implementation details, please see the FLIP-305 document.
>> 
>> This is my poc commits https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>> 
>> 
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> 
>> 
>> --
>> 
>> Best regards,
>> 
>> Mang Zhang
>> 
>> 
>> 
>> At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
>>> Hi Mang,
>>> 
>>> Thanks for starting this FLIP.
>>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>>> 
>>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>>> 
>>> Best,
>>> Jingsong
>>> 
>>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
>>>> 
>>>> Hi Ron,
>>>> 
>>>> 
>>>> First of all, thank you for your reply!
>>>> After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
>>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
>>>> But it's a good point that I will seriously consider.
>>>> Non-atomic CTAS can be supported relatively easily;
>>>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> 
>>>> Best regards,
>>>> Mang Zhang
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
>>>>> Hi, Mang
>>>>> 
>>>>> I have a question about the implementation details. For the atomicity case,
>>>>> since the target table is not created before the JobGraph is generated, but
>>>>> then the target table is required to exist when optimizing plan to generate
>>>>> the JobGraph. So how do you solve this problem?
>>>>> 
>>>>> Best,
>>>>> Ron
>>>>> 
>>>>> yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>>>>> 
>>>>>> Share some insights about the new TwoPhaseCatalogTable proposed after
>>>>>> offline discussion with Mang.
>>>>>> The main or important reason is that the TwoPhaseCatalogTable enables
>>>>>> external connectors to implement theirs own logic for commit / abort.
>>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>>>>>> when the job fail. It's not ideal for it's too generic to work well.
>>>>>> For example, some connectors will need to clean some temporary files in
>>>>>> abort method. And the actual connector can know the specific logic for
>>>>>> aborting.
>>>>>> 
>>>>>> Best regards,
>>>>>> Yuxia
>>>>>> 
>>>>>> 
>>>>>> 发件人: "zhangmang1" <zh...@163.com>
>>>>>> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
>>>>>> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
>>>>>> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
>>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>>>>>> SELECT(CTAS) statement
>>>>>> 
>>>>>> hi, Jing
>>>>>> Thank you for your reply.
>>>>>>> 1. It looks like you found another way to design the atomic CTAS with new
>>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable
>>>>>> as
>>>>>>> described in FLIP-218. Did I understand correctly?
>>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered problems
>>>>>> with Catalog/CatalogTable serialization deserialization, for example, after
>>>>>> deserialization CatalogTable could not be converted to Hive Table. Also,
>>>>>> Catalog serialization is still a heavy operation, but it may not actually
>>>>>> be necessary, we just need Create Table.
>>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>>>>>> facilitates the implementation of the subsequent data lake, ReplaceTable
>>>>>> and other functions.
>>>>>> 
>>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of
>>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>>>>>>> smell) we should commonly avoid in the public interface. According to the
>>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>>>>>>> support atomic or not. With this selector argument, there will be two
>>>>>>> different logics built within one method and it is hard to follow without
>>>>>>> reading the code or the doc carefully(another concern is to keep the doc
>>>>>>> and code alway be consistent) i.e. sometimes there will be no difference
>>>>>> by
>>>>>>> using true/false isStreamingMode, sometimes they are quite different -
>>>>>>> atomic vs. non-atomic. Another question is, before we call
>>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>>>>>>> isStreamingMode. In case only non-atomic is supported for streaming mode,
>>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>>>>>>> anything here?
>>>>>> Here's what I think about this issue, atomic CTAS wants to be the default
>>>>>> behavior and only fall back to non-atomic CTAS if it's completely
>>>>>> unattainable. Atomic CTAS will bring a better experience to users.
>>>>>> Flink is already a stream batch unified engine, In our company kwai, many
>>>>>> users are also using flink to do batch data processing, but still running
>>>>>> in Stream mode.
>>>>>> The boundary between stream and batch is gradually blurred, stream mode
>>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
>>>>>> provides different atomicity implementations in Batch and Stream modes.
>>>>>> Not only to determine if atomicity is supported, but also to help select
>>>>>> different TwoPhaseCatalogTable implementations to provide different levels
>>>>>> of atomicity!
>>>>>> 
>>>>>> Looking forward to more feedback.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Best regards,
>>>>>> Mang Zhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>>>>>>> Hi Mang,
>>>>>>> 
>>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks for
>>>>>>> driving it. I have two questions and would like to know your thoughts,
>>>>>>> thanks:
>>>>>>> 
>>>>>>> 1. It looks like you found another way to design the atomic CTAS with new
>>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable
>>>>>> as
>>>>>>> described in FLIP-218. Did I understand correctly?
>>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of
>>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>>>>>>> smell) we should commonly avoid in the public interface. According to the
>>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>>>>>>> support atomic or not. With this selector argument, there will be two
>>>>>>> different logics built within one method and it is hard to follow without
>>>>>>> reading the code or the doc carefully(another concern is to keep the doc
>>>>>>> and code alway be consistent) i.e. sometimes there will be no difference
>>>>>> by
>>>>>>> using true/false isStreamingMode, sometimes they are quite different -
>>>>>>> atomic vs. non-atomic. Another question is, before we call
>>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>>>>>>> isStreamingMode. In case only non-atomic is supported for streaming mode,
>>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>>>>>>> anything here?
>>>>>>> 
>>>>>>> Best regards,
>>>>>>> Jing
>>>>>>> 
>>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi, Mang.
>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very useful
>>>>>>>> in batch scenarios and integrate with the data lake which support
>>>>>>>> transcation.
>>>>>>>> 
>>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need to know
>>>>>>>> it's for normal case or the atomicity with CTAS as well as neccessary
>>>>>>>> context.
>>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>>>>>> the
>>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
>>>>>>>> TwoPhaseCatalogTable which is different from normal case.
>>>>>>>> 
>>>>>>>> How can the DynamiacTableSink can get it? Could you give some
>>>>>> explanation
>>>>>>>> or example in this FLIP?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Yuxia
>>>>>>>> 
>>>>>>>> ----- 原始邮件 -----
>>>>>>>> 发件人: "zhangmang1" <zh...@163.com>
>>>>>>>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>>>>>>>> "lincoln 86xy" <li...@gmail.com>
>>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>>>>>>>> SELECT(CTAS) statement
>>>>>>>> 
>>>>>>>> Hi, Lincoln and Ron
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thank you for your reply.
>>>>>>>> On the naming wise I think OK, the future expansion of new features more
>>>>>>>> uniform. I have updated the FLIP.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>>>>>> can
>>>>>>>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>>>>>>>> tables with speculative execution 3. writing Hive table with small file
>>>>>>>> merge
>>>>>>>> 
>>>>>>>> 
>>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>>>>>> in
>>>>>>>> the Flink framework,
>>>>>>>> so I only poc to verify the first scenario of writing to the Hive table,
>>>>>>>> and we can subsequently split the sub-task to support the other two
>>>>>>>> scenarios.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Mang Zhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>>>>>>>>> Hi, Mang
>>>>>>>>> 
>>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
>>>>>> useful
>>>>>>>> in
>>>>>>>>> batch scenarios.
>>>>>>>>> 
>>>>>>>>> I have two questions:
>>>>>>>>> 1. naming wise:
>>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
>>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>>>>>>>>> `TwoPhaseCatalogTable`?
>>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>>>>>> 'transaction'
>>>>>>>>> in the method name, which may remind users of the relevance of
>>>>>> transaction
>>>>>>>>> support (however, it is not strictly so), so I suggest changing it to
>>>>>>>>> `begin`
>>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or other
>>>>>>>>> catalogs?
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Lincoln Lee
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>>>>>>>>> 
>>>>>>>>>> Hi, Mang
>>>>>>>>>> Atomicity is very important for CTAS, especially for batch jobs. This
>>>>>>>> FLIP
>>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
>>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
>>>>>>>> mentioned
>>>>>>>>>> three levels of atomicity semantics, can this current design do the
>>>>>>>> same as
>>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
>>>>>> isolation,
>>>>>>>>>> for example, can it be done by writing to Hive tables using CTAS?
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Ron
>>>>>>>>>> 
>>>>>>>>>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi, everyone
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic for
>>>>>>>> CREATE
>>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>>>>>> not
>>>>>>>>>>> atomic. It will create the table first before job running. If the
>>>>>> job
>>>>>>>>>>> execution fails, or is cancelled, the table will not be dropped.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
>>>>>>>> created
>>>>>>>>>>> when the Job succeeds. Improve user experience.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Mang Zhang
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 


Re: Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Mang for updating!

Looks good to me!

Best,
Jingsong

On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zh...@163.com> wrote:
>
> Hi Jingsong,
>
> >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >Flink design places execution in the TableFactory or directly in the
> >Catalog, so introducing an executable table makes me feel a bit
> >strange. (Spark is this style, but Flink may not be)
> On this issue, we introduce the executable logic commit/abort a bit of strange on CatalogTable.
> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
> The new solution is similar to the implementation of SupportsOverwrite,
> which introduces the SupportsStaging interface and infers whether DynamicTableSink supports atomic ctas based on whether it implements the SupportsStaging interface,
> and if so, it will get the StagedTable object from DynamicTableSink.
>
> For more implementation details, please see the FLIP-305 document.
>
> This is my poc commits https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
> >Hi Mang,
> >
> >Thanks for starting this FLIP.
> >
> >I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >Flink design places execution in the TableFactory or directly in the
> >Catalog, so introducing an executable table makes me feel a bit
> >strange. (Spark is this style, but Flink may not be)
> >
> >And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >
> >Best,
> >Jingsong
> >
> >On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
> >>
> >> Hi Ron,
> >>
> >>
> >> First of all, thank you for your reply!
> >> After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
> >> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
> >> But it's a good point that I will seriously consider.
> >> Non-atomic CTAS can be supported relatively easily;
> >> But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >>
> >> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
> >> >Hi, Mang
> >> >
> >> >I have a question about the implementation details. For the atomicity case,
> >> >since the target table is not created before the JobGraph is generated, but
> >> >then the target table is required to exist when optimizing plan to generate
> >> >the JobGraph. So how do you solve this problem?
> >> >
> >> >Best,
> >> >Ron
> >> >
> >> >yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >> >
> >> >> Share some insights about the new TwoPhaseCatalogTable proposed after
> >> >> offline discussion with Mang.
> >> >> The main or important reason is that the TwoPhaseCatalogTable enables
> >> >> external connectors to implement theirs own logic for commit / abort.
> >> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
> >> >> when the job fail. It's not ideal for it's too generic to work well.
> >> >> For example, some connectors will need to clean some temporary files in
> >> >> abort method. And the actual connector can know the specific logic for
> >> >> aborting.
> >> >>
> >> >> Best regards,
> >> >> Yuxia
> >> >>
> >> >>
> >> >> 发件人: "zhangmang1" <zh...@163.com>
> >> >> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
> >> >> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
> >> >> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
> >> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> SELECT(CTAS) statement
> >> >>
> >> >> hi, Jing
> >> >> Thank you for your reply.
> >> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> >> >> as
> >> >> >described in FLIP-218. Did I understand correctly?
> >> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems
> >> >> with Catalog/CatalogTable serialization deserialization, for example, after
> >> >> deserialization CatalogTable could not be converted to Hive Table. Also,
> >> >> Catalog serialization is still a heavy operation, but it may not actually
> >> >> be necessary, we just need Create Table.
> >> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >> >> facilitates the implementation of the subsequent data lake, ReplaceTable
> >> >> and other functions.
> >> >>
> >> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >> >smell) we should commonly avoid in the public interface. According to the
> >> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >> >support atomic or not. With this selector argument, there will be two
> >> >> >different logics built within one method and it is hard to follow without
> >> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >> >and code alway be consistent) i.e. sometimes there will be no difference
> >> >> by
> >> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >> >atomic vs. non-atomic. Another question is, before we call
> >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >> >anything here?
> >> >> Here's what I think about this issue, atomic CTAS wants to be the default
> >> >> behavior and only fall back to non-atomic CTAS if it's completely
> >> >> unattainable. Atomic CTAS will bring a better experience to users.
> >> >> Flink is already a stream batch unified engine, In our company kwai, many
> >> >> users are also using flink to do batch data processing, but still running
> >> >> in Stream mode.
> >> >> The boundary between stream and batch is gradually blurred, stream mode
> >> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> >> provides different atomicity implementations in Batch and Stream modes.
> >> >> Not only to determine if atomicity is supported, but also to help select
> >> >> different TwoPhaseCatalogTable implementations to provide different levels
> >> >> of atomicity!
> >> >>
> >> >> Looking forward to more feedback.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Best regards,
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >> >> >Hi Mang,
> >> >> >
> >> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >> >> >driving it. I have two questions and would like to know your thoughts,
> >> >> >thanks:
> >> >> >
> >> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> >> >> as
> >> >> >described in FLIP-218. Did I understand correctly?
> >> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >> >smell) we should commonly avoid in the public interface. According to the
> >> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >> >support atomic or not. With this selector argument, there will be two
> >> >> >different logics built within one method and it is hard to follow without
> >> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >> >and code alway be consistent) i.e. sometimes there will be no difference
> >> >> by
> >> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >> >atomic vs. non-atomic. Another question is, before we call
> >> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >> >anything here?
> >> >> >
> >> >> >Best regards,
> >> >> >Jing
> >> >> >
> >> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
> >> >> wrote:
> >> >> >
> >> >> >> Hi, Mang.
> >> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> >> >> in batch scenarios and integrate with the data lake which support
> >> >> >> transcation.
> >> >> >>
> >> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> >> >> context.
> >> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
> >> >> the
> >> >> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> >> >> TwoPhaseCatalogTable which is different from normal case.
> >> >> >>
> >> >> >> How can the DynamiacTableSink can get it? Could you give some
> >> >> explanation
> >> >> >> or example in this FLIP?
> >> >> >>
> >> >> >>
> >> >> >> Best regards,
> >> >> >> Yuxia
> >> >> >>
> >> >> >> ----- 原始邮件 -----
> >> >> >> 发件人: "zhangmang1" <zh...@163.com>
> >> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> >> >> >> "lincoln 86xy" <li...@gmail.com>
> >> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> >> SELECT(CTAS) statement
> >> >> >>
> >> >> >> Hi, Lincoln and Ron
> >> >> >>
> >> >> >>
> >> >> >> Thank you for your reply.
> >> >> >> On the naming wise I think OK, the future expansion of new features more
> >> >> >> uniform. I have updated the FLIP.
> >> >> >>
> >> >> >>
> >> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
> >> >> can
> >> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> >> >> tables with speculative execution 3. writing Hive table with small file
> >> >> >> merge
> >> >> >>
> >> >> >>
> >> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
> >> >> in
> >> >> >> the Flink framework,
> >> >> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> >> >> and we can subsequently split the sub-task to support the other two
> >> >> >> scenarios.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >>
> >> >> >> Best regards,
> >> >> >> Mang Zhang
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >> >> >> >Hi, Mang
> >> >> >> >
> >> >> >> >+1 for completing the support for atomicity of CTAS, this is very
> >> >> useful
> >> >> >> in
> >> >> >> >batch scenarios.
> >> >> >> >
> >> >> >> >I have two questions:
> >> >> >> >1. naming wise:
> >> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >> >> >`TwoPhaseCatalogTable`?
> >> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >> >> 'transaction'
> >> >> >> >in the method name, which may remind users of the relevance of
> >> >> transaction
> >> >> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >> >> >`begin`
> >> >> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >> >> >catalogs?
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Lincoln Lee
> >> >> >> >
> >> >> >> >
> >> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >> >> >
> >> >> >> >> Hi, Mang
> >> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> >> >> FLIP
> >> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> >> >> mentioned
> >> >> >> >> three levels of atomicity semantics, can this current design do the
> >> >> >> same as
> >> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
> >> >> isolation,
> >> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >> >> >>
> >> >> >> >> Best,
> >> >> >> >> Ron
> >> >> >> >>
> >> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >> >> >>
> >> >> >> >> > Hi, everyone
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> >> >> CREATE
> >> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
> >> >> not
> >> >> >> >> > atomic. It will create the table first before job running. If the
> >> >> job
> >> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> >> >> created
> >> >> >> >> > when the Job succeeds. Improve user experience.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > Looking forward to your feedback.
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > [1]
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > --
> >> >> >> >> >
> >> >> >> >> > Best regards,
> >> >> >> >> > Mang Zhang
> >> >> >> >>
> >> >> >>
> >> >>
> >> >>

Re:Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Jingsong,


>I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>Flink design places execution in the TableFactory or directly in the
>Catalog, so introducing an executable table makes me feel a bit

>strange. (Spark is this style, but Flink may not be)
On this issue, we introduce the executable logic commit/abort a bit of strange on CatalogTable.
After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
The new solution is similar to the implementation of SupportsOverwrite, 
which introduces the SupportsStaging interface and infers whether DynamicTableSink supports atomic ctas based on whether it implements the SupportsStaging interface, 
and if so, it will get the StagedTable object from DynamicTableSink.


For more implementation details, please see the FLIP-305 document. 


This is my poc commits https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa






[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement




--

Best regards,
Mang Zhang





At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
>Hi Mang,
>
>Thanks for starting this FLIP.
>
>I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>Flink design places execution in the TableFactory or directly in the
>Catalog, so introducing an executable table makes me feel a bit
>strange. (Spark is this style, but Flink may not be)
>
>And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>
>Best,
>Jingsong
>
>On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
>>
>> Hi Ron,
>>
>>
>> First of all, thank you for your reply!
>> After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
>> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
>> But it's a good point that I will seriously consider.
>> Non-atomic CTAS can be supported relatively easily;
>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >I have a question about the implementation details. For the atomicity case,
>> >since the target table is not created before the JobGraph is generated, but
>> >then the target table is required to exist when optimizing plan to generate
>> >the JobGraph. So how do you solve this problem?
>> >
>> >Best,
>> >Ron
>> >
>> >yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>> >
>> >> Share some insights about the new TwoPhaseCatalogTable proposed after
>> >> offline discussion with Mang.
>> >> The main or important reason is that the TwoPhaseCatalogTable enables
>> >> external connectors to implement theirs own logic for commit / abort.
>> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>> >> when the job fail. It's not ideal for it's too generic to work well.
>> >> For example, some connectors will need to clean some temporary files in
>> >> abort method. And the actual connector can know the specific logic for
>> >> aborting.
>> >>
>> >> Best regards,
>> >> Yuxia
>> >>
>> >>
>> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
>> >> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
>> >> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
>> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> SELECT(CTAS) statement
>> >>
>> >> hi, Jing
>> >> Thank you for your reply.
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems
>> >> with Catalog/CatalogTable serialization deserialization, for example, after
>> >> deserialization CatalogTable could not be converted to Hive Table. Also,
>> >> Catalog serialization is still a heavy operation, but it may not actually
>> >> be necessary, we just need Create Table.
>> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> >> facilitates the implementation of the subsequent data lake, ReplaceTable
>> >> and other functions.
>> >>
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> Here's what I think about this issue, atomic CTAS wants to be the default
>> >> behavior and only fall back to non-atomic CTAS if it's completely
>> >> unattainable. Atomic CTAS will bring a better experience to users.
>> >> Flink is already a stream batch unified engine, In our company kwai, many
>> >> users are also using flink to do batch data processing, but still running
>> >> in Stream mode.
>> >> The boundary between stream and batch is gradually blurred, stream mode
>> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> >> provides different atomicity implementations in Batch and Stream modes.
>> >> Not only to determine if atomicity is supported, but also to help select
>> >> different TwoPhaseCatalogTable implementations to provide different levels
>> >> of atomicity!
>> >>
>> >> Looking forward to more feedback.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >> >Hi Mang,
>> >> >
>> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >> >driving it. I have two questions and would like to know your thoughts,
>> >> >thanks:
>> >> >
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> >
>> >> >Best regards,
>> >> >Jing
>> >> >
>> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
>> >> wrote:
>> >> >
>> >> >> Hi, Mang.
>> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
>> >> >> in batch scenarios and integrate with the data lake which support
>> >> >> transcation.
>> >> >>
>> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> >> context.
>> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>> >> the
>> >> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> >> TwoPhaseCatalogTable which is different from normal case.
>> >> >>
>> >> >> How can the DynamiacTableSink can get it? Could you give some
>> >> explanation
>> >> >> or example in this FLIP?
>> >> >>
>> >> >>
>> >> >> Best regards,
>> >> >> Yuxia
>> >> >>
>> >> >> ----- 原始邮件 -----
>> >> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> >> >> "lincoln 86xy" <li...@gmail.com>
>> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> >> SELECT(CTAS) statement
>> >> >>
>> >> >> Hi, Lincoln and Ron
>> >> >>
>> >> >>
>> >> >> Thank you for your reply.
>> >> >> On the naming wise I think OK, the future expansion of new features more
>> >> >> uniform. I have updated the FLIP.
>> >> >>
>> >> >>
>> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>> >> can
>> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> >> tables with speculative execution 3. writing Hive table with small file
>> >> >> merge
>> >> >>
>> >> >>
>> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>> >> in
>> >> >> the Flink framework,
>> >> >> so I only poc to verify the first scenario of writing to the Hive table,
>> >> >> and we can subsequently split the sub-task to support the other two
>> >> >> scenarios.
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Best regards,
>> >> >> Mang Zhang
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >> >> >Hi, Mang
>> >> >> >
>> >> >> >+1 for completing the support for atomicity of CTAS, this is very
>> >> useful
>> >> >> in
>> >> >> >batch scenarios.
>> >> >> >
>> >> >> >I have two questions:
>> >> >> >1. naming wise:
>> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >> >`TwoPhaseCatalogTable`?
>> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>> >> 'transaction'
>> >> >> >in the method name, which may remind users of the relevance of
>> >> transaction
>> >> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >> >`begin`
>> >> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >> >catalogs?
>> >> >> >
>> >> >> >Best,
>> >> >> >Lincoln Lee
>> >> >> >
>> >> >> >
>> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >> >
>> >> >> >> Hi, Mang
>> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> >> >> FLIP
>> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> >> mentioned
>> >> >> >> three levels of atomicity semantics, can this current design do the
>> >> >> same as
>> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
>> >> isolation,
>> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Ron
>> >> >> >>
>> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >> >>
>> >> >> >> > Hi, everyone
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> >> CREATE
>> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>> >> not
>> >> >> >> > atomic. It will create the table first before job running. If the
>> >> job
>> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> >> created
>> >> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > Looking forward to your feedback.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > [1]
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > --
>> >> >> >> >
>> >> >> >> > Best regards,
>> >> >> >> > Mang Zhang
>> >> >> >>
>> >> >>
>> >>
>> >>

Re:Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Jingsong,


Thank you for your reply!
We introduced `TwoPhaseCatalogTable` for two reasons:
1. The `TwoPhaseCatalogTable` of different data sources can have more operations, if through Catalog, there can only be simple create table and drop table, not flexible enough; For example, deleting a temporary directory, or using rename table in a relational database to implement atomic semantics in flink;
2. Facilitate subsequent extensions, such as support for replace table, extended data lake storage support;
>And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
Regarding naming, at first, use `StagedCatalogTable`, but after offline discussions with yuxia and Lincoln, we think there is already TwoPhaseCommittingSink/TwoPhaseCommitSinkFunction in Flink, in order to keep the naming unity so change to `TwoPhaseCatalogTable`.







--

Best regards,
Mang Zhang





At 2023-05-12 13:02:14, "Jingsong Li" <ji...@gmail.com> wrote:
>Hi Mang,
>
>Thanks for starting this FLIP.
>
>I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>Flink design places execution in the TableFactory or directly in the
>Catalog, so introducing an executable table makes me feel a bit
>strange. (Spark is this style, but Flink may not be)
>
>And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>
>Best,
>Jingsong
>
>On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
>>
>> Hi Ron,
>>
>>
>> First of all, thank you for your reply!
>> After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
>> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
>> But it's a good point that I will seriously consider.
>> Non-atomic CTAS can be supported relatively easily;
>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >I have a question about the implementation details. For the atomicity case,
>> >since the target table is not created before the JobGraph is generated, but
>> >then the target table is required to exist when optimizing plan to generate
>> >the JobGraph. So how do you solve this problem?
>> >
>> >Best,
>> >Ron
>> >
>> >yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>> >
>> >> Share some insights about the new TwoPhaseCatalogTable proposed after
>> >> offline discussion with Mang.
>> >> The main or important reason is that the TwoPhaseCatalogTable enables
>> >> external connectors to implement theirs own logic for commit / abort.
>> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>> >> when the job fail. It's not ideal for it's too generic to work well.
>> >> For example, some connectors will need to clean some temporary files in
>> >> abort method. And the actual connector can know the specific logic for
>> >> aborting.
>> >>
>> >> Best regards,
>> >> Yuxia
>> >>
>> >>
>> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
>> >> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
>> >> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
>> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> SELECT(CTAS) statement
>> >>
>> >> hi, Jing
>> >> Thank you for your reply.
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems
>> >> with Catalog/CatalogTable serialization deserialization, for example, after
>> >> deserialization CatalogTable could not be converted to Hive Table. Also,
>> >> Catalog serialization is still a heavy operation, but it may not actually
>> >> be necessary, we just need Create Table.
>> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> >> facilitates the implementation of the subsequent data lake, ReplaceTable
>> >> and other functions.
>> >>
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> Here's what I think about this issue, atomic CTAS wants to be the default
>> >> behavior and only fall back to non-atomic CTAS if it's completely
>> >> unattainable. Atomic CTAS will bring a better experience to users.
>> >> Flink is already a stream batch unified engine, In our company kwai, many
>> >> users are also using flink to do batch data processing, but still running
>> >> in Stream mode.
>> >> The boundary between stream and batch is gradually blurred, stream mode
>> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> >> provides different atomicity implementations in Batch and Stream modes.
>> >> Not only to determine if atomicity is supported, but also to help select
>> >> different TwoPhaseCatalogTable implementations to provide different levels
>> >> of atomicity!
>> >>
>> >> Looking forward to more feedback.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >> >Hi Mang,
>> >> >
>> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >> >driving it. I have two questions and would like to know your thoughts,
>> >> >thanks:
>> >> >
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> >
>> >> >Best regards,
>> >> >Jing
>> >> >
>> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
>> >> wrote:
>> >> >
>> >> >> Hi, Mang.
>> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
>> >> >> in batch scenarios and integrate with the data lake which support
>> >> >> transcation.
>> >> >>
>> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> >> context.
>> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>> >> the
>> >> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> >> TwoPhaseCatalogTable which is different from normal case.
>> >> >>
>> >> >> How can the DynamiacTableSink can get it? Could you give some
>> >> explanation
>> >> >> or example in this FLIP?
>> >> >>
>> >> >>
>> >> >> Best regards,
>> >> >> Yuxia
>> >> >>
>> >> >> ----- 原始邮件 -----
>> >> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> >> >> "lincoln 86xy" <li...@gmail.com>
>> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> >> SELECT(CTAS) statement
>> >> >>
>> >> >> Hi, Lincoln and Ron
>> >> >>
>> >> >>
>> >> >> Thank you for your reply.
>> >> >> On the naming wise I think OK, the future expansion of new features more
>> >> >> uniform. I have updated the FLIP.
>> >> >>
>> >> >>
>> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>> >> can
>> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> >> tables with speculative execution 3. writing Hive table with small file
>> >> >> merge
>> >> >>
>> >> >>
>> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>> >> in
>> >> >> the Flink framework,
>> >> >> so I only poc to verify the first scenario of writing to the Hive table,
>> >> >> and we can subsequently split the sub-task to support the other two
>> >> >> scenarios.
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Best regards,
>> >> >> Mang Zhang
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >> >> >Hi, Mang
>> >> >> >
>> >> >> >+1 for completing the support for atomicity of CTAS, this is very
>> >> useful
>> >> >> in
>> >> >> >batch scenarios.
>> >> >> >
>> >> >> >I have two questions:
>> >> >> >1. naming wise:
>> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >> >`TwoPhaseCatalogTable`?
>> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>> >> 'transaction'
>> >> >> >in the method name, which may remind users of the relevance of
>> >> transaction
>> >> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >> >`begin`
>> >> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >> >catalogs?
>> >> >> >
>> >> >> >Best,
>> >> >> >Lincoln Lee
>> >> >> >
>> >> >> >
>> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >> >
>> >> >> >> Hi, Mang
>> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> >> >> FLIP
>> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> >> mentioned
>> >> >> >> three levels of atomicity semantics, can this current design do the
>> >> >> same as
>> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
>> >> isolation,
>> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Ron
>> >> >> >>
>> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >> >>
>> >> >> >> > Hi, everyone
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> >> CREATE
>> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>> >> not
>> >> >> >> > atomic. It will create the table first before job running. If the
>> >> job
>> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> >> created
>> >> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > Looking forward to your feedback.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > [1]
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > --
>> >> >> >> >
>> >> >> >> > Best regards,
>> >> >> >> > Mang Zhang
>> >> >> >>
>> >> >>
>> >>
>> >>

Re: Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jingsong Li <ji...@gmail.com>.
Hi Mang,

Thanks for starting this FLIP.

I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
Flink design places execution in the TableFactory or directly in the
Catalog, so introducing an executable table makes me feel a bit
strange. (Spark is this style, but Flink may not be)

And for `TwoPhase`, maybe `StagedXXX` like Spark is better?

Best,
Jingsong

On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zh...@163.com> wrote:
>
> Hi Ron,
>
>
> First of all, thank you for your reply!
> After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
> But it's a good point that I will seriously consider.
> Non-atomic CTAS can be supported relatively easily;
> But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang
>
>
>
>
>
> At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
> >Hi, Mang
> >
> >I have a question about the implementation details. For the atomicity case,
> >since the target table is not created before the JobGraph is generated, but
> >then the target table is required to exist when optimizing plan to generate
> >the JobGraph. So how do you solve this problem?
> >
> >Best,
> >Ron
> >
> >yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >
> >> Share some insights about the new TwoPhaseCatalogTable proposed after
> >> offline discussion with Mang.
> >> The main or important reason is that the TwoPhaseCatalogTable enables
> >> external connectors to implement theirs own logic for commit / abort.
> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
> >> when the job fail. It's not ideal for it's too generic to work well.
> >> For example, some connectors will need to clean some temporary files in
> >> abort method. And the actual connector can know the specific logic for
> >> aborting.
> >>
> >> Best regards,
> >> Yuxia
> >>
> >>
> >> 发件人: "zhangmang1" <zh...@163.com>
> >> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
> >> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
> >> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> SELECT(CTAS) statement
> >>
> >> hi, Jing
> >> Thank you for your reply.
> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> >> as
> >> >described in FLIP-218. Did I understand correctly?
> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems
> >> with Catalog/CatalogTable serialization deserialization, for example, after
> >> deserialization CatalogTable could not be converted to Hive Table. Also,
> >> Catalog serialization is still a heavy operation, but it may not actually
> >> be necessary, we just need Create Table.
> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >> facilitates the implementation of the subsequent data lake, ReplaceTable
> >> and other functions.
> >>
> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >smell) we should commonly avoid in the public interface. According to the
> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >support atomic or not. With this selector argument, there will be two
> >> >different logics built within one method and it is hard to follow without
> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >and code alway be consistent) i.e. sometimes there will be no difference
> >> by
> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >atomic vs. non-atomic. Another question is, before we call
> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >anything here?
> >> Here's what I think about this issue, atomic CTAS wants to be the default
> >> behavior and only fall back to non-atomic CTAS if it's completely
> >> unattainable. Atomic CTAS will bring a better experience to users.
> >> Flink is already a stream batch unified engine, In our company kwai, many
> >> users are also using flink to do batch data processing, but still running
> >> in Stream mode.
> >> The boundary between stream and batch is gradually blurred, stream mode
> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> provides different atomicity implementations in Batch and Stream modes.
> >> Not only to determine if atomicity is supported, but also to help select
> >> different TwoPhaseCatalogTable implementations to provide different levels
> >> of atomicity!
> >>
> >> Looking forward to more feedback.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >> >Hi Mang,
> >> >
> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >> >driving it. I have two questions and would like to know your thoughts,
> >> >thanks:
> >> >
> >> >1. It looks like you found another way to design the atomic CTAS with new
> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> >> as
> >> >described in FLIP-218. Did I understand correctly?
> >> >2. I am a little bit confused about the isStreamingMode parameter of
> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >> >smell) we should commonly avoid in the public interface. According to the
> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >> >support atomic or not. With this selector argument, there will be two
> >> >different logics built within one method and it is hard to follow without
> >> >reading the code or the doc carefully(another concern is to keep the doc
> >> >and code alway be consistent) i.e. sometimes there will be no difference
> >> by
> >> >using true/false isStreamingMode, sometimes they are quite different -
> >> >atomic vs. non-atomic. Another question is, before we call
> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >> >we could just follow FLIP-218 instead of (twistedly) calling
> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >> >anything here?
> >> >
> >> >Best regards,
> >> >Jing
> >> >
> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
> >> wrote:
> >> >
> >> >> Hi, Mang.
> >> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> >> in batch scenarios and integrate with the data lake which support
> >> >> transcation.
> >> >>
> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> >> context.
> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
> >> the
> >> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> >> TwoPhaseCatalogTable which is different from normal case.
> >> >>
> >> >> How can the DynamiacTableSink can get it? Could you give some
> >> explanation
> >> >> or example in this FLIP?
> >> >>
> >> >>
> >> >> Best regards,
> >> >> Yuxia
> >> >>
> >> >> ----- 原始邮件 -----
> >> >> 发件人: "zhangmang1" <zh...@163.com>
> >> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> >> >> "lincoln 86xy" <li...@gmail.com>
> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >> SELECT(CTAS) statement
> >> >>
> >> >> Hi, Lincoln and Ron
> >> >>
> >> >>
> >> >> Thank you for your reply.
> >> >> On the naming wise I think OK, the future expansion of new features more
> >> >> uniform. I have updated the FLIP.
> >> >>
> >> >>
> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
> >> can
> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> >> tables with speculative execution 3. writing Hive table with small file
> >> >> merge
> >> >>
> >> >>
> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
> >> in
> >> >> the Flink framework,
> >> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> >> and we can subsequently split the sub-task to support the other two
> >> >> scenarios.
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Best regards,
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >> >> >Hi, Mang
> >> >> >
> >> >> >+1 for completing the support for atomicity of CTAS, this is very
> >> useful
> >> >> in
> >> >> >batch scenarios.
> >> >> >
> >> >> >I have two questions:
> >> >> >1. naming wise:
> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >> >`TwoPhaseCatalogTable`?
> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >> 'transaction'
> >> >> >in the method name, which may remind users of the relevance of
> >> transaction
> >> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >> >`begin`
> >> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >> >catalogs?
> >> >> >
> >> >> >Best,
> >> >> >Lincoln Lee
> >> >> >
> >> >> >
> >> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >> >
> >> >> >> Hi, Mang
> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> >> FLIP
> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> >> mentioned
> >> >> >> three levels of atomicity semantics, can this current design do the
> >> >> same as
> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
> >> isolation,
> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >> >>
> >> >> >> Best,
> >> >> >> Ron
> >> >> >>
> >> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >> >>
> >> >> >> > Hi, everyone
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> >> CREATE
> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
> >> not
> >> >> >> > atomic. It will create the table first before job running. If the
> >> job
> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> >> created
> >> >> >> > when the Job succeeds. Improve user experience.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > Looking forward to your feedback.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > [1]
> >> >> >> >
> >> >> >>
> >> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > --
> >> >> >> >
> >> >> >> > Best regards,
> >> >> >> > Mang Zhang
> >> >> >>
> >> >>
> >>
> >>

Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi Ron,


First of all, thank you for your reply!
After our offline communication, what you said is mainly in the compilePlan scenario, but currently compilePlanSql does not support non INSERT statements, otherwise it will throw an exception.
>Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT
But it's a good point that I will seriously consider. 
Non-atomic CTAS can be supported relatively easily;
But atomic CTAS needs more adaptation work, so I'm going to leave it as is and follow up with a separate issue to implement CTAS support for compilePlanSql.






--

Best regards,
Mang Zhang





At 2023-04-23 17:52:07, "liu ron" <ro...@gmail.com> wrote:
>Hi, Mang
>
>I have a question about the implementation details. For the atomicity case,
>since the target table is not created before the JobGraph is generated, but
>then the target table is required to exist when optimizing plan to generate
>the JobGraph. So how do you solve this problem?
>
>Best,
>Ron
>
>yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>
>> Share some insights about the new TwoPhaseCatalogTable proposed after
>> offline discussion with Mang.
>> The main or important reason is that the TwoPhaseCatalogTable enables
>> external connectors to implement theirs own logic for commit / abort.
>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>> when the job fail. It's not ideal for it's too generic to work well.
>> For example, some connectors will need to clean some temporary files in
>> abort method. And the actual connector can know the specific logic for
>> aborting.
>>
>> Best regards,
>> Yuxia
>>
>>
>> 发件人: "zhangmang1" <zh...@163.com>
>> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
>> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
>> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> SELECT(CTAS) statement
>>
>> hi, Jing
>> Thank you for your reply.
>> >1. It looks like you found another way to design the atomic CTAS with new
>> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> as
>> >described in FLIP-218. Did I understand correctly?
>> Yes, when I was implementing the FLIP-218 solution, I encountered problems
>> with Catalog/CatalogTable serialization deserialization, for example, after
>> deserialization CatalogTable could not be converted to Hive Table. Also,
>> Catalog serialization is still a heavy operation, but it may not actually
>> be necessary, we just need Create Table.
>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> facilitates the implementation of the subsequent data lake, ReplaceTable
>> and other functions.
>>
>> >2. I am a little bit confused about the isStreamingMode parameter of
>> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >smell) we should commonly avoid in the public interface. According to the
>> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >support atomic or not. With this selector argument, there will be two
>> >different logics built within one method and it is hard to follow without
>> >reading the code or the doc carefully(another concern is to keep the doc
>> >and code alway be consistent) i.e. sometimes there will be no difference
>> by
>> >using true/false isStreamingMode, sometimes they are quite different -
>> >atomic vs. non-atomic. Another question is, before we call
>> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >we could just follow FLIP-218 instead of (twistedly) calling
>> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >anything here?
>> Here's what I think about this issue, atomic CTAS wants to be the default
>> behavior and only fall back to non-atomic CTAS if it's completely
>> unattainable. Atomic CTAS will bring a better experience to users.
>> Flink is already a stream batch unified engine, In our company kwai, many
>> users are also using flink to do batch data processing, but still running
>> in Stream mode.
>> The boundary between stream and batch is gradually blurred, stream mode
>> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> provides different atomicity implementations in Batch and Stream modes.
>> Not only to determine if atomicity is supported, but also to help select
>> different TwoPhaseCatalogTable implementations to provide different levels
>> of atomicity!
>>
>> Looking forward to more feedback.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>> >Hi Mang,
>> >
>> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >driving it. I have two questions and would like to know your thoughts,
>> >thanks:
>> >
>> >1. It looks like you found another way to design the atomic CTAS with new
>> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> as
>> >described in FLIP-218. Did I understand correctly?
>> >2. I am a little bit confused about the isStreamingMode parameter of
>> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >smell) we should commonly avoid in the public interface. According to the
>> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>> >support atomic or not. With this selector argument, there will be two
>> >different logics built within one method and it is hard to follow without
>> >reading the code or the doc carefully(another concern is to keep the doc
>> >and code alway be consistent) i.e. sometimes there will be no difference
>> by
>> >using true/false isStreamingMode, sometimes they are quite different -
>> >atomic vs. non-atomic. Another question is, before we call
>> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >we could just follow FLIP-218 instead of (twistedly) calling
>> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >anything here?
>> >
>> >Best regards,
>> >Jing
>> >
>> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
>> wrote:
>> >
>> >> Hi, Mang.
>> >> +1 for completing the support for atomicity of CTAS, this is very useful
>> >> in batch scenarios and integrate with the data lake which support
>> >> transcation.
>> >>
>> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> context.
>> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>> the
>> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> TwoPhaseCatalogTable which is different from normal case.
>> >>
>> >> How can the DynamiacTableSink can get it? Could you give some
>> explanation
>> >> or example in this FLIP?
>> >>
>> >>
>> >> Best regards,
>> >> Yuxia
>> >>
>> >> ----- 原始邮件 -----
>> >> 发件人: "zhangmang1" <zh...@163.com>
>> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> >> "lincoln 86xy" <li...@gmail.com>
>> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> SELECT(CTAS) statement
>> >>
>> >> Hi, Lincoln and Ron
>> >>
>> >>
>> >> Thank you for your reply.
>> >> On the naming wise I think OK, the future expansion of new features more
>> >> uniform. I have updated the FLIP.
>> >>
>> >>
>> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>> can
>> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> tables with speculative execution 3. writing Hive table with small file
>> >> merge
>> >>
>> >>
>> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>> in
>> >> the Flink framework,
>> >> so I only poc to verify the first scenario of writing to the Hive table,
>> >> and we can subsequently split the sub-task to support the other two
>> >> scenarios.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >> Best regards,
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >> >Hi, Mang
>> >> >
>> >> >+1 for completing the support for atomicity of CTAS, this is very
>> useful
>> >> in
>> >> >batch scenarios.
>> >> >
>> >> >I have two questions:
>> >> >1. naming wise:
>> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >`TwoPhaseCatalogTable`?
>> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>> 'transaction'
>> >> >in the method name, which may remind users of the relevance of
>> transaction
>> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >`begin`
>> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >catalogs?
>> >> >
>> >> >Best,
>> >> >Lincoln Lee
>> >> >
>> >> >
>> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >
>> >> >> Hi, Mang
>> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> >> FLIP
>> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> mentioned
>> >> >> three levels of atomicity semantics, can this current design do the
>> >> same as
>> >> >> Spark's DataSource V2, which can guarantee both atomicity and
>> isolation,
>> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >>
>> >> >> Best,
>> >> >> Ron
>> >> >>
>> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >>
>> >> >> > Hi, everyone
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> CREATE
>> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>> not
>> >> >> > atomic. It will create the table first before job running. If the
>> job
>> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> created
>> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > Looking forward to your feedback.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > [1]
>> >> >> >
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Mang Zhang
>> >> >>
>> >>
>>
>>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by liu ron <ro...@gmail.com>.
Hi, Mang

I have a question about the implementation details. For the atomicity case,
since the target table is not created before the JobGraph is generated, but
then the target table is required to exist when optimizing plan to generate
the JobGraph. So how do you solve this problem?

Best,
Ron

yuxia <lu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:

> Share some insights about the new TwoPhaseCatalogTable proposed after
> offline discussion with Mang.
> The main or important reason is that the TwoPhaseCatalogTable enables
> external connectors to implement theirs own logic for commit / abort.
> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
> when the job fail. It's not ideal for it's too generic to work well.
> For example, some connectors will need to clean some temporary files in
> abort method. And the actual connector can know the specific logic for
> aborting.
>
> Best regards,
> Yuxia
>
>
> 发件人: "zhangmang1" <zh...@163.com>
> 收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com>
> 抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <
> lincoln.86xy@gmail.com>, luoyuxia@alumni.sjtu.edu.cn
> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> SELECT(CTAS) statement
>
> hi, Jing
> Thank you for your reply.
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> as
> >described in FLIP-218. Did I understand correctly?
> Yes, when I was implementing the FLIP-218 solution, I encountered problems
> with Catalog/CatalogTable serialization deserialization, for example, after
> deserialization CatalogTable could not be converted to Hive Table. Also,
> Catalog serialization is still a heavy operation, but it may not actually
> be necessary, we just need Create Table.
> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> facilitates the implementation of the subsequent data lake, ReplaceTable
> and other functions.
>
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference
> by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
> Here's what I think about this issue, atomic CTAS wants to be the default
> behavior and only fall back to non-atomic CTAS if it's completely
> unattainable. Atomic CTAS will bring a better experience to users.
> Flink is already a stream batch unified engine, In our company kwai, many
> users are also using flink to do batch data processing, but still running
> in Stream mode.
> The boundary between stream and batch is gradually blurred, stream mode
> jobs may also FINISH, so I added the isStreamingMode parameter, this
> provides different atomicity implementations in Batch and Stream modes.
> Not only to determine if atomicity is supported, but also to help select
> different TwoPhaseCatalogTable implementations to provide different levels
> of atomicity!
>
> Looking forward to more feedback.
>
>
>
>
>
>
>
>
>
> --
> Best regards,
> Mang Zhang
>
>
>
>
> At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
> >Hi Mang,
> >
> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
> >driving it. I have two questions and would like to know your thoughts,
> >thanks:
> >
> >1. It looks like you found another way to design the atomic CTAS with new
> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
> as
> >described in FLIP-218. Did I understand correctly?
> >2. I am a little bit confused about the isStreamingMode parameter of
> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
> >smell) we should commonly avoid in the public interface. According to the
> >FLIP,  isStreamingMode will be used by the Catalog to determine whether to
> >support atomic or not. With this selector argument, there will be two
> >different logics built within one method and it is hard to follow without
> >reading the code or the doc carefully(another concern is to keep the doc
> >and code alway be consistent) i.e. sometimes there will be no difference
> by
> >using true/false isStreamingMode, sometimes they are quite different -
> >atomic vs. non-atomic. Another question is, before we call
> >Catalog#twoPhaseCreateTable(...), we have to know the value of
> >isStreamingMode. In case only non-atomic is supported for streaming mode,
> >we could just follow FLIP-218 instead of (twistedly) calling
> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
> >anything here?
> >
> >Best regards,
> >Jing
> >
> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn>
> wrote:
> >
> >> Hi, Mang.
> >> +1 for completing the support for atomicity of CTAS, this is very useful
> >> in batch scenarios and integrate with the data lake which support
> >> transcation.
> >>
> >> I just have one question, IIUC, the DynamiacTableSink will need to know
> >> it's for normal case or the atomicity with CTAS as well as neccessary
> >> context.
> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
> the
> >> jdbc DynamiacTableSink will write the temp table defined in the
> >> TwoPhaseCatalogTable which is different from normal case.
> >>
> >> How can the DynamiacTableSink can get it? Could you give some
> explanation
> >> or example in this FLIP?
> >>
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> ----- 原始邮件 -----
> >> 发件人: "zhangmang1" <zh...@163.com>
> >> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> >> "lincoln 86xy" <li...@gmail.com>
> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> SELECT(CTAS) statement
> >>
> >> Hi, Lincoln and Ron
> >>
> >>
> >> Thank you for your reply.
> >> On the naming wise I think OK, the future expansion of new features more
> >> uniform. I have updated the FLIP.
> >>
> >>
> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
> can
> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> >> tables with speculative execution 3. writing Hive table with small file
> >> merge
> >>
> >>
> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
> in
> >> the Flink framework,
> >> so I only poc to verify the first scenario of writing to the Hive table,
> >> and we can subsequently split the sub-task to support the other two
> >> scenarios.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best regards,
> >> Mang Zhang
> >>
> >>
> >>
> >>
> >>
> >> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >> >Hi, Mang
> >> >
> >> >+1 for completing the support for atomicity of CTAS, this is very
> useful
> >> in
> >> >batch scenarios.
> >> >
> >> >I have two questions:
> >> >1. naming wise:
> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >`Catalog#twoPhaseCreateTable` (and we may add
> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >`TwoPhaseCatalogTable`?
> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> 'transaction'
> >> >in the method name, which may remind users of the relevance of
> transaction
> >> >support (however, it is not strictly so), so I suggest changing it to
> >> >`begin`
> >> >2. Has this design been validated by any relevant Poc on hive or other
> >> >catalogs?
> >> >
> >> >Best,
> >> >Lincoln Lee
> >> >
> >> >
> >> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >
> >> >> Hi, Mang
> >> >> Atomicity is very important for CTAS, especially for batch jobs. This
> >> FLIP
> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >> I just have one question, in the Motivation part of FLIP-218, we
> >> mentioned
> >> >> three levels of atomicity semantics, can this current design do the
> >> same as
> >> >> Spark's DataSource V2, which can guarantee both atomicity and
> isolation,
> >> >> for example, can it be done by writing to Hive tables using CTAS?
> >> >>
> >> >> Best,
> >> >> Ron
> >> >>
> >> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >> >>
> >> >> > Hi, everyone
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> >> CREATE
> >> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
> not
> >> >> > atomic. It will create the table first before job running. If the
> job
> >> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > So I want Flink to support atomic CTAS, where only the table is
> >> created
> >> >> > when the Job succeeds. Improve user experience.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > Looking forward to your feedback.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > [1]
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> >
> >> >> > Best regards,
> >> >> > Mang Zhang
> >> >>
> >>
>
>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Share some insights about the new TwoPhaseCatalogTable proposed after offline discussion with Mang. 
The main or important reason is that the TwoPhaseCatalogTable enables external connectors to implement theirs own logic for commit / abort. 
In FLIP-218, for atomic CTAS, the Catalog will then just drop the table when the job fail. It's not ideal for it's too generic to work well. 
For example, some connectors will need to clean some temporary files in abort method. And the actual connector can know the specific logic for aborting. 

Best regards, 
Yuxia 


发件人: "zhangmang1" <zh...@163.com> 
收件人: "dev" <de...@flink.apache.org>, "Jing Ge" <ji...@ververica.com> 
抄送: "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <li...@gmail.com>, luoyuxia@alumni.sjtu.edu.cn 
发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 
主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement 

hi, Jing 
Thank you for your reply. 
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as 
>described in FLIP-218. Did I understand correctly? 
Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table. 
Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions. 

>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here? 
Here's what I think about this issue, atomic CTAS wants to be the default behavior and only fall back to non-atomic CTAS if it's completely unattainable. Atomic CTAS will bring a better experience to users. 
Flink is already a stream batch unified engine, In our company kwai, many users are also using flink to do batch data processing, but still running in Stream mode. 
The boundary between stream and batch is gradually blurred, stream mode jobs may also FINISH, so I added the isStreamingMode parameter, this provides different atomicity implementations in Batch and Stream modes. 
Not only to determine if atomicity is supported, but also to help select different TwoPhaseCatalogTable implementations to provide different levels of atomicity! 

Looking forward to more feedback. 









-- 
Best regards, 
Mang Zhang 




At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>Hi Mang,
>
>This is the FLIP I was looking forward to after FLIP-218. Thanks for
>driving it. I have two questions and would like to know your thoughts,
>thanks:
>
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>described in FLIP-218. Did I understand correctly?
>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here?
>
>Best regards,
>Jing
>
>On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, Mang.
>> +1 for completing the support for atomicity of CTAS, this is very useful
>> in batch scenarios and integrate with the data lake which support
>> transcation.
>>
>> I just have one question, IIUC, the DynamiacTableSink will need to know
>> it's for normal case or the atomicity with CTAS as well as neccessary
>> context.
>> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
>> jdbc DynamiacTableSink will write the temp table defined in the
>> TwoPhaseCatalogTable which is different from normal case.
>>
>> How can the DynamiacTableSink can get it? Could you give some explanation
>> or example in this FLIP?
>>
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "zhangmang1" <zh...@163.com>
>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> "lincoln 86xy" <li...@gmail.com>
>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> SELECT(CTAS) statement
>>
>> Hi, Lincoln and Ron
>>
>>
>> Thank you for your reply.
>> On the naming wise I think OK, the future expansion of new features more
>> uniform. I have updated the FLIP.
>>
>>
>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> tables with speculative execution 3. writing Hive table with small file
>> merge
>>
>>
>> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
>> the Flink framework,
>> so I only poc to verify the first scenario of writing to the Hive table,
>> and we can subsequently split the sub-task to support the other two
>> scenarios.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >+1 for completing the support for atomicity of CTAS, this is very useful
>> in
>> >batch scenarios.
>> >
>> >I have two questions:
>> >1. naming wise:
>> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >`Catalog#twoPhaseCreateTable` (and we may add
>> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >`TwoPhaseCatalogTable`?
>> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>> >in the method name, which may remind users of the relevance of transaction
>> >support (however, it is not strictly so), so I suggest changing it to
>> >`begin`
>> >2. Has this design been validated by any relevant Poc on hive or other
>> >catalogs?
>> >
>> >Best,
>> >Lincoln Lee
>> >
>> >
>> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >
>> >> Hi, Mang
>> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> FLIP
>> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> I just have one question, in the Motivation part of FLIP-218, we
>> mentioned
>> >> three levels of atomicity semantics, can this current design do the
>> same as
>> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> >> for example, can it be done by writing to Hive tables using CTAS?
>> >>
>> >> Best,
>> >> Ron
>> >>
>> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >>
>> >> > Hi, everyone
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> CREATE
>> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> >> > atomic. It will create the table first before job running. If the job
>> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > So I want Flink to support atomic CTAS, where only the table is
>> created
>> >> > when the Job succeeds. Improve user experience.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Looking forward to your feedback.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> >
>> >> > Best regards,
>> >> > Mang Zhang
>> >>
>> 


Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
hi, Jing
Thank you for your reply. 
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as

>described in FLIP-218. Did I understand correctly?
Yes, when I was implementing the FLIP-218 solution, I encountered problems with Catalog/CatalogTable serialization deserialization, for example, after deserialization CatalogTable could not be converted to Hive Table. Also, Catalog serialization is still a heavy operation, but it may not actually be necessary, we just need Create Table.
Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates the implementation of the subsequent data lake, ReplaceTable and other functions.


>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here?
Here's what I think about this issue, atomic CTAS wants to be the default behavior and only fall back to non-atomic CTAS if it's completely unattainable. Atomic CTAS will bring a better experience to users.
Flink is already a stream batch unified engine, In our company kwai, many users are also using flink to do batch data processing, but still running in Stream mode. 
The boundary between stream and batch is gradually blurred, stream mode jobs may also FINISH, so I added the isStreamingMode parameter, this provides different atomicity implementations in Batch and Stream modes.
Not only to determine if atomicity is supported, but also to help select different TwoPhaseCatalogTable implementations to provide different levels of atomicity!


Looking forward to more feedback.









--

Best regards,
Mang Zhang





At 2023-04-15 04:20:40, "Jing Ge" <ji...@ververica.com.INVALID> wrote:
>Hi Mang,
>
>This is the FLIP I was looking forward to after FLIP-218. Thanks for
>driving it. I have two questions and would like to know your thoughts,
>thanks:
>
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>described in FLIP-218. Did I understand correctly?
>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here?
>
>Best regards,
>Jing
>
>On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, Mang.
>> +1 for completing the support for atomicity of CTAS, this is very useful
>> in batch scenarios and integrate with the data lake which support
>> transcation.
>>
>> I just have one question, IIUC, the DynamiacTableSink will need to know
>> it's for normal case or the atomicity with CTAS as well as neccessary
>> context.
>> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
>> jdbc DynamiacTableSink will write the temp table defined in the
>> TwoPhaseCatalogTable which is different from normal case.
>>
>> How can the DynamiacTableSink can get it? Could you give some explanation
>> or example in this FLIP?
>>
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "zhangmang1" <zh...@163.com>
>> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
>> "lincoln 86xy" <li...@gmail.com>
>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> SELECT(CTAS) statement
>>
>> Hi, Lincoln and Ron
>>
>>
>> Thank you for your reply.
>> On the naming wise I think OK, the future expansion of new features more
>> uniform. I have updated the FLIP.
>>
>>
>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> tables with speculative execution 3. writing Hive table with small file
>> merge
>>
>>
>> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
>> the Flink framework,
>> so I only poc to verify the first scenario of writing to the Hive table,
>> and we can subsequently split the sub-task to support the other two
>> scenarios.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >+1 for completing the support for atomicity of CTAS, this is very useful
>> in
>> >batch scenarios.
>> >
>> >I have two questions:
>> >1. naming wise:
>> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >`Catalog#twoPhaseCreateTable` (and we may add
>> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >`TwoPhaseCatalogTable`?
>> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>> >in the method name, which may remind users of the relevance of transaction
>> >support (however, it is not strictly so), so I suggest changing it to
>> >`begin`
>> >2. Has this design been validated by any relevant Poc on hive or other
>> >catalogs?
>> >
>> >Best,
>> >Lincoln Lee
>> >
>> >
>> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>> >
>> >> Hi, Mang
>> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> FLIP
>> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> I just have one question, in the Motivation part of FLIP-218, we
>> mentioned
>> >> three levels of atomicity semantics, can this current design do the
>> same as
>> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> >> for example, can it be done by writing to Hive tables using CTAS?
>> >>
>> >> Best,
>> >> Ron
>> >>
>> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>> >>
>> >> > Hi, everyone
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> CREATE
>> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> >> > atomic. It will create the table first before job running. If the job
>> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > So I want Flink to support atomic CTAS, where only the table is
>> created
>> >> > when the Job succeeds. Improve user experience.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Looking forward to your feedback.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> >
>> >> > Best regards,
>> >> > Mang Zhang
>> >>
>>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Mang,

This is the FLIP I was looking forward to after FLIP-218. Thanks for
driving it. I have two questions and would like to know your thoughts,
thanks:

1. It looks like you found another way to design the atomic CTAS with new
serializable TwoPhaseCatalogTable instead of making Catalog serializable as
described in FLIP-218. Did I understand correctly?
2. I am a little bit confused about the isStreamingMode parameter of
Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
smell) we should commonly avoid in the public interface. According to the
FLIP,  isStreamingMode will be used by the Catalog to determine whether to
support atomic or not. With this selector argument, there will be two
different logics built within one method and it is hard to follow without
reading the code or the doc carefully(another concern is to keep the doc
and code alway be consistent) i.e. sometimes there will be no difference by
using true/false isStreamingMode, sometimes they are quite different -
atomic vs. non-atomic. Another question is, before we call
Catalog#twoPhaseCreateTable(...), we have to know the value of
isStreamingMode. In case only non-atomic is supported for streaming mode,
we could just follow FLIP-218 instead of (twistedly) calling
Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
anything here?

Best regards,
Jing

On Fri, Apr 14, 2023 at 1:55 PM yuxia <lu...@alumni.sjtu.edu.cn> wrote:

> Hi, Mang.
> +1 for completing the support for atomicity of CTAS, this is very useful
> in batch scenarios and integrate with the data lake which support
> transcation.
>
> I just have one question, IIUC, the DynamiacTableSink will need to know
> it's for normal case or the atomicity with CTAS as well as neccessary
> context.
> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
> jdbc DynamiacTableSink will write the temp table defined in the
> TwoPhaseCatalogTable which is different from normal case.
>
> How can the DynamiacTableSink can get it? Could you give some explanation
> or example in this FLIP?
>
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "zhangmang1" <zh...@163.com>
> 收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>,
> "lincoln 86xy" <li...@gmail.com>
> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> SELECT(CTAS) statement
>
> Hi, Lincoln and Ron
>
>
> Thank you for your reply.
> On the naming wise I think OK, the future expansion of new features more
> uniform. I have updated the FLIP.
>
>
> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
> tables with speculative execution 3. writing Hive table with small file
> merge
>
>
> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
> the Flink framework,
> so I only poc to verify the first scenario of writing to the Hive table,
> and we can subsequently split the sub-task to support the other two
> scenarios.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang
>
>
>
>
>
> At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
> >Hi, Mang
> >
> >+1 for completing the support for atomicity of CTAS, this is very useful
> in
> >batch scenarios.
> >
> >I have two questions:
> >1. naming wise:
> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >`Catalog#twoPhaseCreateTable` (and we may add
> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >`TwoPhaseCatalogTable`?
> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
> >in the method name, which may remind users of the relevance of transaction
> >support (however, it is not strictly so), so I suggest changing it to
> >`begin`
> >2. Has this design been validated by any relevant Poc on hive or other
> >catalogs?
> >
> >Best,
> >Lincoln Lee
> >
> >
> >liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
> >
> >> Hi, Mang
> >> Atomicity is very important for CTAS, especially for batch jobs. This
> FLIP
> >> is a continuation of FLIP-218, which is valuable for CTAS.
> >> I just have one question, in the Motivation part of FLIP-218, we
> mentioned
> >> three levels of atomicity semantics, can this current design do the
> same as
> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> >> for example, can it be done by writing to Hive tables using CTAS?
> >>
> >> Best,
> >> Ron
> >>
> >> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
> >>
> >> > Hi, everyone
> >> >
> >> >
> >> >
> >> >
> >> > I'd like to start a discussion about FLIP-305: Support atomic for
> CREATE
> >> > TABLE AS SELECT(CTAS) statement [1].
> >> >
> >> >
> >> >
> >> >
> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> >> > atomic. It will create the table first before job running. If the job
> >> > execution fails, or is cancelled, the table will not be dropped.
> >> >
> >> >
> >> >
> >> >
> >> > So I want Flink to support atomic CTAS, where only the table is
> created
> >> > when the Job succeeds. Improve user experience.
> >> >
> >> >
> >> >
> >> >
> >> > Looking forward to your feedback.
> >> >
> >> >
> >> >
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> >
> >> > Best regards,
> >> > Mang Zhang
> >>
>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Mang.
+1 for completing the support for atomicity of CTAS, this is very useful in batch scenarios and integrate with the data lake which support transcation.

I just have one question, IIUC, the DynamiacTableSink will need to know it's for normal case or the atomicity with CTAS as well as neccessary context.
Take jdbc catalog as an example, if it's CTAS with atomicity supports, the jdbc DynamiacTableSink will write the temp table defined in the TwoPhaseCatalogTable which is different from normal case.

How can the DynamiacTableSink can get it? Could you give some explanation or example in this FLIP?


Best regards,
Yuxia

----- 原始邮件 -----
发件人: "zhangmang1" <zh...@163.com>
收件人: "dev" <de...@flink.apache.org>, "ron9 liu" <ro...@gmail.com>, "lincoln 86xy" <li...@gmail.com>
发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Hi, Lincoln and Ron


Thank you for your reply.
On the naming wise I think OK, the future expansion of new features more uniform. I have updated the FLIP.


About Hive support atomicity CTAS, Hive is rich in usage scenarios and can be divided into three scenarios: 1. writing Hive tables 2. writing Hive tables with speculative execution 3. writing Hive table with small file merge


The main purpose of FLIP-305 is to implement support for CTAS atomicity in the Flink framework,
so I only poc to verify the first scenario of writing to the Hive table, and we can subsequently split the sub-task to support the other two scenarios.














--

Best regards,
Mang Zhang





At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>Hi, Mang
>
>+1 for completing the support for atomicity of CTAS, this is very useful in
>batch scenarios.
>
>I have two questions:
>1. naming wise:
>  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>`Catalog#twoPhaseCreateTable` (and we may add
>twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>`TwoPhaseCatalogTable`?
>  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>in the method name, which may remind users of the relevance of transaction
>support (however, it is not strictly so), so I suggest changing it to
>`begin`
>2. Has this design been validated by any relevant Poc on hive or other
>catalogs?
>
>Best,
>Lincoln Lee
>
>
>liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>
>> Hi, Mang
>> Atomicity is very important for CTAS, especially for batch jobs. This FLIP
>> is a continuation of FLIP-218, which is valuable for CTAS.
>> I just have one question, in the Motivation part of FLIP-218, we mentioned
>> three levels of atomicity semantics, can this current design do the same as
>> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> for example, can it be done by writing to Hive tables using CTAS?
>>
>> Best,
>> Ron
>>
>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>>
>> > Hi, everyone
>> >
>> >
>> >
>> >
>> > I'd like to start a discussion about FLIP-305: Support atomic for CREATE
>> > TABLE AS SELECT(CTAS) statement [1].
>> >
>> >
>> >
>> >
>> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> > atomic. It will create the table first before job running. If the job
>> > execution fails, or is cancelled, the table will not be dropped.
>> >
>> >
>> >
>> >
>> > So I want Flink to support atomic CTAS, where only the table is created
>> > when the Job succeeds. Improve user experience.
>> >
>> >
>> >
>> >
>> > Looking forward to your feedback.
>> >
>> >
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> >
>> > Best regards,
>> > Mang Zhang
>>

Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Mang Zhang <zh...@163.com>.
Hi, Lincoln and Ron


Thank you for your reply.
On the naming wise I think OK, the future expansion of new features more uniform. I have updated the FLIP.


About Hive support atomicity CTAS, Hive is rich in usage scenarios and can be divided into three scenarios: 1. writing Hive tables 2. writing Hive tables with speculative execution 3. writing Hive table with small file merge


The main purpose of FLIP-305 is to implement support for CTAS atomicity in the Flink framework,
so I only poc to verify the first scenario of writing to the Hive table, and we can subsequently split the sub-task to support the other two scenarios.














--

Best regards,
Mang Zhang





At 2023-04-13 12:27:24, "Lincoln Lee" <li...@gmail.com> wrote:
>Hi, Mang
>
>+1 for completing the support for atomicity of CTAS, this is very useful in
>batch scenarios.
>
>I have two questions:
>1. naming wise:
>  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>`Catalog#twoPhaseCreateTable` (and we may add
>twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>`TwoPhaseCatalogTable`?
>  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>in the method name, which may remind users of the relevance of transaction
>support (however, it is not strictly so), so I suggest changing it to
>`begin`
>2. Has this design been validated by any relevant Poc on hive or other
>catalogs?
>
>Best,
>Lincoln Lee
>
>
>liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:
>
>> Hi, Mang
>> Atomicity is very important for CTAS, especially for batch jobs. This FLIP
>> is a continuation of FLIP-218, which is valuable for CTAS.
>> I just have one question, in the Motivation part of FLIP-218, we mentioned
>> three levels of atomicity semantics, can this current design do the same as
>> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> for example, can it be done by writing to Hive tables using CTAS?
>>
>> Best,
>> Ron
>>
>> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>>
>> > Hi, everyone
>> >
>> >
>> >
>> >
>> > I'd like to start a discussion about FLIP-305: Support atomic for CREATE
>> > TABLE AS SELECT(CTAS) statement [1].
>> >
>> >
>> >
>> >
>> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> > atomic. It will create the table first before job running. If the job
>> > execution fails, or is cancelled, the table will not be dropped.
>> >
>> >
>> >
>> >
>> > So I want Flink to support atomic CTAS, where only the table is created
>> > when the Job succeeds. Improve user experience.
>> >
>> >
>> >
>> >
>> > Looking forward to your feedback.
>> >
>> >
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> >
>> > Best regards,
>> > Mang Zhang
>>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by Lincoln Lee <li...@gmail.com>.
Hi, Mang

+1 for completing the support for atomicity of CTAS, this is very useful in
batch scenarios.

I have two questions:
1. naming wise:
  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
`Catalog#twoPhaseCreateTable` (and we may add
twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
`TwoPhaseCatalogTable`?
  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
in the method name, which may remind users of the relevance of transaction
support (however, it is not strictly so), so I suggest changing it to
`begin`
2. Has this design been validated by any relevant Poc on hive or other
catalogs?

Best,
Lincoln Lee


liu ron <ro...@gmail.com> 于2023年4月13日周四 10:17写道:

> Hi, Mang
> Atomicity is very important for CTAS, especially for batch jobs. This FLIP
> is a continuation of FLIP-218, which is valuable for CTAS.
> I just have one question, in the Motivation part of FLIP-218, we mentioned
> three levels of atomicity semantics, can this current design do the same as
> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> for example, can it be done by writing to Hive tables using CTAS?
>
> Best,
> Ron
>
> Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:
>
> > Hi, everyone
> >
> >
> >
> >
> > I'd like to start a discussion about FLIP-305: Support atomic for CREATE
> > TABLE AS SELECT(CTAS) statement [1].
> >
> >
> >
> >
> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> > atomic. It will create the table first before job running. If the job
> > execution fails, or is cancelled, the table will not be dropped.
> >
> >
> >
> >
> > So I want Flink to support atomic CTAS, where only the table is created
> > when the Job succeeds. Improve user experience.
> >
> >
> >
> >
> > Looking forward to your feedback.
> >
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best regards,
> > Mang Zhang
>

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Posted by liu ron <ro...@gmail.com>.
Hi, Mang
Atomicity is very important for CTAS, especially for batch jobs. This FLIP
is a continuation of FLIP-218, which is valuable for CTAS.
I just have one question, in the Motivation part of FLIP-218, we mentioned
three levels of atomicity semantics, can this current design do the same as
Spark's DataSource V2, which can guarantee both atomicity and isolation,
for example, can it be done by writing to Hive tables using CTAS?

Best,
Ron

Mang Zhang <zh...@163.com> 于2023年4月10日周一 11:03写道:

> Hi, everyone
>
>
>
>
> I'd like to start a discussion about FLIP-305: Support atomic for CREATE
> TABLE AS SELECT(CTAS) statement [1].
>
>
>
>
> CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> atomic. It will create the table first before job running. If the job
> execution fails, or is cancelled, the table will not be dropped.
>
>
>
>
> So I want Flink to support atomic CTAS, where only the table is created
> when the Job succeeds. Improve user experience.
>
>
>
>
> Looking forward to your feedback.
>
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
>
>
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang