You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by zhou chao <zh...@hotmail.com> on 2022/02/09 03:15:57 UTC

[DISCUSS]Support the merge statement in FlinkSQL

Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE statement, and the discussion consists of two parts. In the first part, we want to explore and collect the cases and motivations of the MERGE statement users. In the second part, we want to find out the possibility for Flink SQL to support the merge statement.

Before driving the first topic, we want to introduce the definition and benefits of the merge statement. The MERGE statement in SQL is a very popular clause and it can handle inserts, updates, and deletes all in a single transaction without having to write separate logic for each of these. 
For each insert, update, or delete statement, we can specify conditions separately. Now, many Engine/DBs have supported this feature, for example, SQL Server[1], Spark[2], Hive[3],  pgSQL[4]. 

Our use case: 
Order analysis & processing is one the most important scenario, but sometimes updated orders have a long time span compared with the last one with the same primary key, in the meanwhile, the states for this key have expired, such that the wrong Agg result will be achieved. In this situation, we use the merge statement in a batch job to correct the results, and now spark + iceberg is chosen in our internal. In the future, we want to unify the batch & streaming by using FlinkSQL in our internal, it would be better if Flink could support the merge statement. If you have other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and there exists a Jira CALCITE-4338[5] to track. Could we support the merge statement relying on the limited support from calcite-1.26.0? I wrote a simple doc[6] to drive this, just want to find out the possibility for Flink SQL to support the merge statement.

Looking forward to your feedback, thanks. 

best,
zoucao


[1]https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
[2]https://issues.apache.org/jira/browse/SPARK-28893
[3]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
[5]https://issues.apache.org/jira/browse/CALCITE-4338
[6]https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing

Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by OpenInx <op...@gmail.com>.
> Could you list some needs from your point of view ?

I know the typical requirements about extending the native flink sql for
iceberg tables are:

1.  There are many table management operations in apache iceberg which can
be friendly if we can express in SQL for end users. such as:  expire
iceberg table snapshot, rewrite data files, rewrite metadata files, remove
orphan files etc.   In iceberg spark extension, we provide a  CALL
<procedure-name> ( arg, ...) extension syntax to execute those operations
in Spark SQL. See more details here [1];
2.  ALTER TABLE syntax.  There are some partition evolution syntax that
engines usually don't provide as native syntax.  See [2]
3.  MERGE INTO syntax as discussed above.

[1] https://iceberg.apache.org/docs/latest/spark-procedures/
[2]
https://github.com/apache/iceberg/blob/f5a753791f4dc6aca78569a14f731feda9edf462/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4#L70


On Mon, Feb 21, 2022 at 4:39 PM zhou chao <zh...@hotmail.com> wrote:

> Hi, Openlnx,  FG and Jingsong, thanks for your attention about this issue.
> I am sorry for the late reply, I’ m working on the further job about the
> implement
> of merge statement, and now we support this in our internal for the basic
> SQL syntax.
>
> About: Implement our flink sql plugin/extensions which extends the core
> calcite sql.
> Going a step further, is it possible for us to achieve a better
> abstraction of the
> flink sql framework.
>
> AFAIK, some extensions has been done for FlinkSQL from calcite, but if we
> want more
> functionality and flexibility, such that greater risk and more workload
> will follow.  Someone
> knows flink and calcite well enough could give us some advise.
> I think it is great if flink has a better abstraction if sql framework, I
> know that iceberg
> implements the merge statement by extending logical rule from spark. Hi,
> openInx,
> could you list some needs from your point of view ?
>
> About: the implement of the merge statement
> I will explain how and why we need to rewrite the matched or not matched
> statement,
> I hope to help you know more about it, FG.
> Assume that there exist a target table and a source table with the
> following schema
> (a int, b bigint, c string)
> If the merge statement is
>
> |  merge into target t
> |  using source s
> |  on t.a = s.a
> |  when matched and s.c = ‘Flink’ then update set c = s.c
> |  when matched and s.c <> ’Flink’ then update set c = ‘Flink'
> |  when not matched then insert values(s.a, s.b, ‘Flink’)
>
> How to & why resolve before validation:
> 1. convert  the merge statement to
> select * from source s left outer join target t on t.a and s.a
> We choose left outer join as default, the inner or anti join could be seen
> as a optimizer and be
> used in logical rule.
> 2. convert the update statement to
> insert into t.a, t.b, s.c from (source s left outer join target t on t.a =
> s.a) where s.c = ‘Flink’
> We convert the update statement to the insert statement, because we need
> two inputs to update
> the target table , but the update statement only have one input, which can
> not meet the
> requirements. The same situation will occur in delete statement.
> Although it is tricky, we only need to rewrite the method rewriteMerge()
> and validateMerge() in
> SqlValidatorImpl, small changes for calcite aspects.
> 3. Then, we define a RelNode called MergeAction, which is the collection
> of all matched
> or not matched actions.
> Here is the AST:
> LogicalLegacySink(name=[`default_catalog`.`default_database`.`target`],
> fields=[a, b, c])
> +- LogicalMergeAction(
> action=[matched-0], op=[+U], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> expr#7=[=($t2, $t6)], a=[$t3], b=[$t4], c=[$t2], $condition=[$t7],
> action=[matched-1], op=[+U], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink'], expr#7=[_UTF-16LE'Flink':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE"], expr#8=[<>($t2, $t7)], a=[$t3], b=[$t4],
> EXPR$2=[$t6], $condition=[$t8],
> action=[not-matched-0], op=[+I], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink'], expr#7=[true], proj#0..1=[{exprs}],
> EXPR$2=[$t6], $condition=[$t7])
>    +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
>       +- LogicalJoin(condition=[=($3, $0)], joinType=[left])
>          :- LogicalTableScan(table=[[default_catalog, default_database,
> source, source: [CollectionTableSource(a, b, c)]]])
>          +- LogicalTableScan(table=[[default_catalog, default_database,
> target, source: [CollectionTableSource(a, b, c)]]])
>
> I am sorry that the doc is out of date, I will improve it as soon as
> possible.
>
> About: the merge statement for unbounded data
> I think it is very meaningful to support streaming 'merge into’ in flink,
> but now the biggest problem
> is the target table, which is used as source and sink table both. Maybe
> there are two ways we can go,
> one is introducing the third table as the sink table, the other is
> breaking the data loop, WDYS ?
>
>
>
> 2022年2月18日 下午2:03,Jingsong Li <jingsonglee0@gmail.com<mailto:
> jingsonglee0@gmail.com>> 写道:
>
> Hi zoucao,
>
> Thanks for your proposal. I believe this discussion will take us one
> step further for iceberg/hudi integration.
>
> ## `MERGE` for streaming
>
> I feel that `MERGE` is very good for stream computing. And, this is
> where our Flink can have an advantage over other computation systems.
>
> MERGE INTO target_table USING source_table;
>
> Under the semantics of stream computation, the incremental data of
> source_table is read in real time, and these incremental data are
> merged into the target_table in real time.
>
> I think this is a great capability, and further research and
> discussion is needed for detailed `MERGE INTO` streaming support.
>
> ## Calcite sql extensions
>
> I'm not sure that calcite is good at extending its SQL syntax, a worst
> case scenario is that Flink needs to maintain a version of calcite of
> its own.
>
> I would prefer that we try to push the syntax to the calcite community
> as much as possible unless we have a clear solution for sql syntax
> extensions.
>
> Best,
> Jingsong
>
> On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani
> <fr...@ververica.com>> wrote:
>
> In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange.
>
> This is the same concern I have here, I don't see how MERGE can work in a
> streaming scenario without modifying its preliminary assumptions and
> semantics.
>
> Even assuming we put some hard constraint on the state size, for example
> requiring to specify a window definition (like in interval joins), I still
> think that the fundamental assumption of MERGE here is a problem: the
> target table is both a sink and a source. And I think this is a big issue,
> as we cannot reasonably assume that sink and sources are available for the
> same table definition or that they behave similarly.
>
> Also, talking about the batch implementation, I don't understand how you
> would implement this: from what I see in the "*validator*" paragraph of
> your document, you convert the merge statement to a bunch of other sql
> statements, but you omit the initial join, fundamental for the semantics of
> MERGE. Perhaps can you provide more details about it?
>
> On another note, I think we can take inspiration from MERGE and its "event
> driven" semantics, in order to have something that works both for batch and
> streaming, say a "Flink-ified" version of MERGE.
>
> For example, something that I can think of could be:
>
> PUSH TO target_table
> FROM source_table
> ON [window TVF]
> [when_clause [...]]
>
> Where when_clause looks like the ones from MERGE (looking at the pgsql).
> This has the window TVF constraint, so the state doesn't grow indefinitely,
> and the source_table is effectively any select you can think of, removing
> the assumption that the target is both a sink and a source. This statement
> at the end produces a changelog stream, pushed to the output table. A
> statement like this could then allow you to have something similar to the
> MERGE, just by replacing source_table with a select performing the join. Of
> course this is an example, and might not make much sense, but I hope it
> gives you the idea.
>
> FG
>
>
> On Mon, Feb 14, 2022 at 4:28 AM OpenInx <openinx@gmail.com<mailto:
> openinx@gmail.com>> wrote:
>
> I'm currently maintaining the iceberg flink modules from apache iceberg
> community.
>
> Currently, the spark has a great integration experience with iceberg format
> v2 in batch mode.  In this document [1],
> The merge into syntax from spark sql extensions does really help a lot when
> people want to change row-level data.
>
> We flink currently has a good integration with iceberg format v2 in
> streaming mode, I mean people can export their
> change log data into an iceberg table directly by writing a few sql.
> This[2] is a good material to read if anybody want to
> create a simple demo.
>
> But I'd say in the batch scenarios,  we flink sql currently lack few
> critical SQL syntax (for integrating iceberg format v2 in batch mode
> better):
> 1.  ALTER TABLE to change columns.
> 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> 3.  MERGE INTO to merge a batch changing row set  (mixed with
> insert/delete/update) into the given table.
>
> In short, if we want to provide better integration and user experience with
> iceberg v2 in batch, then I think the support of the above syntax
> is very important (from iceberg perspective).
>
> I think it's better to make that time investment at Calcite's
> implementation before bringing this to Flink.
>
> I find that there are some sql syntax which are critical for flink sql
> while not for other generic sql parser.  Is it possible to implement our
> flink sql plugin/extensions which
> extends the core calcite sql. Going a step further, is it possible for us
> to achieve a better abstraction of the flink sql framework, so that
> downstream components can implement
> their own customized sql plugins based on this sql framework. In this way,
> it is possible to meet the needs of different components to add their own
> sql implementation on top of
> flink sql.
>
> [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> [2].
>
>
> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
>
>
> On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zh...@hotmail.com> wrote:
>
> Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> posting
> the
> discussion twice. I sent the message to the dev mail group from an unsub-
> scribed account,  but the message was not shown for a while, and I
> guessed
> that
> the dev mail group would not post an email coming from an unsubscribed
> account, such that I sent it again from a subscribed account.
>
> Q: How would you see merge work for streaming data?
> I think this is an interesting topic, especially for Flink, which is
> wanting to unify
> the streaming & batch processing. Back to the merge statement, there
> exist
> two inputs, target_table and source_table(query). In the merge statement,
> source_table is used to correct the target_table's results and all rows
> in
> target_table only need to be corrected once, that's what the batch job
> does.
> In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange. So far, we have not received any user
> needs
> matching the merge statement for streaming data. I think that the topic
> for
> data streaming should be supported by user needs and use cases before
> talking about.
>
> I really agree that we should leverage Calcite, and push calcite to
> invest
> it,
> but now this feature does not get enough attention in calcite community.
> I
> found that some features for flink were also limited by calcite, such as
> FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> how
> much effort we can usually afford in a situation like this?
>
>
> best,
> zoucao
>
> [1] https://issues.apache.org/jira/browse/FLINK-21714
>
>
> 2022年2月10日 下午4:09,Martijn Visser <martijn@ververica.com<mailto:
> martijn@ververica.com>> 写道:
>
> Hi zoucao,
>
> I see that this message was posted twice, so I choose to only reply to
> the
> latest one (this one). Thanks for bringing this up for discussion.
>
> I agree that support for a merge statement would be a welcome addition to
> Flink SQL for those that are using it for bounded jobs. How would you see
> merge work for streaming data?
>
> I do think that in order for Flink to properly support this, we should
> leverage Calcite for this. If there's no proper/full support for merge in
> Calcite, I don't think we should add this ourselves. I think the time
> investment and increase in technical debt doesn't outweigh the benefits
> that this would bring to Flink. If it's really that important, I think
> it's
> better to make that time investment at Calcite's implementation before
> bringing this to Flink.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:
>
> Hi, devs!
> Jingfeng and I would like to start a discussion about the MERGE
> statement,
> and the discussion consists of two parts. In the first part, we want to
> explore and collect the cases and motivations of the MERGE statement
> users.
> In the second part, we want to find out the possibility for Flink SQL to
> support the merge statement.
>
> Before driving the first topic, we want to introduce the definition and
> benefits of the merge statement. The MERGE statement in SQL is a very
> popular clause and it can handle inserts, updates, and deletes all in a
> single transaction without having to write separate logic for each of
> these.
> For each insert, update, or delete statement, we can specify conditions
> separately. Now, many Engine/DBs have supported this feature, for
> example,
> SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
>
> Our use case:
> Order analysis & processing is one the most important scenario, but
> sometimes updated orders have a long time span compared with the last one
> with the same primary key, in the meanwhile, the states for this key have
> expired, such that the wrong Agg result will be achieved. In this
> situation, we use the merge statement in a batch job to correct the
> results, and now spark + iceberg is chosen in our internal. In the
> future,
> we want to unify the batch & streaming by using FlinkSQL in our internal,
> it would be better if Flink could support the merge statement. If you
> have
> other use cases and opinions, plz show us here.
>
> Now, calcite does not have good support for the merge statement, and
> there
> exists a Jira CALCITE-4338[5] to track. Could we support the merge
> statement relying on the limited support from calcite-1.26.0? I wrote a
> simple doc[6] to drive this, just want to find out the possibility for
> Flink SQL to support the merge statement.
>
> Looking forward to your feedback, thanks.
>
> best,
> zoucao
>
>
> [1]
>
>
>
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> [2]https://issues.apache.org/jira/browse/SPARK-28893
> [3]
>
>
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> [5]https://issues.apache.org/jira/browse/CALCITE-4338
> [6]
>
>
>
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
>
>
>
>
>

Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by zhou chao <zh...@hotmail.com>.
Hi, Openlnx,  FG and Jingsong, thanks for your attention about this issue.
I am sorry for the late reply, I’ m working on the further job about the implement
of merge statement, and now we support this in our internal for the basic SQL syntax.

About: Implement our flink sql plugin/extensions which extends the core calcite sql.
Going a step further, is it possible for us to achieve a better abstraction of the
flink sql framework.

AFAIK, some extensions has been done for FlinkSQL from calcite, but if we want more
functionality and flexibility, such that greater risk and more workload will follow.  Someone
knows flink and calcite well enough could give us some advise.
I think it is great if flink has a better abstraction if sql framework, I know that iceberg
implements the merge statement by extending logical rule from spark. Hi, openInx,
could you list some needs from your point of view ?

About: the implement of the merge statement
I will explain how and why we need to rewrite the matched or not matched statement,
I hope to help you know more about it, FG.
Assume that there exist a target table and a source table with the following schema
(a int, b bigint, c string)
If the merge statement is

|  merge into target t
|  using source s
|  on t.a = s.a
|  when matched and s.c = ‘Flink’ then update set c = s.c
|  when matched and s.c <> ’Flink’ then update set c = ‘Flink'
|  when not matched then insert values(s.a, s.b, ‘Flink’)

How to & why resolve before validation:
1. convert  the merge statement to
select * from source s left outer join target t on t.a and s.a
We choose left outer join as default, the inner or anti join could be seen as a optimizer and be
used in logical rule.
2. convert the update statement to
insert into t.a, t.b, s.c from (source s left outer join target t on t.a = s.a) where s.c = ‘Flink’
We convert the update statement to the insert statement, because we need two inputs to update
the target table , but the update statement only have one input, which can not meet the
requirements. The same situation will occur in delete statement.
Although it is tricky, we only need to rewrite the method rewriteMerge() and validateMerge() in
SqlValidatorImpl, small changes for calcite aspects.
3. Then, we define a RelNode called MergeAction, which is the collection of all matched
or not matched actions.
Here is the AST:
LogicalLegacySink(name=[`default_catalog`.`default_database`.`target`], fields=[a, b, c])
+- LogicalMergeAction(
action=[matched-0], op=[+U], expr#0..5=[{inputs}], expr#6=[_UTF-16LE'Flink':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], expr#7=[=($t2, $t6)], a=[$t3], b=[$t4], c=[$t2], $condition=[$t7],
action=[matched-1], op=[+U], expr#0..5=[{inputs}], expr#6=[_UTF-16LE'Flink'], expr#7=[_UTF-16LE'Flink':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], expr#8=[<>($t2, $t7)], a=[$t3], b=[$t4], EXPR$2=[$t6], $condition=[$t8],
action=[not-matched-0], op=[+I], expr#0..5=[{inputs}], expr#6=[_UTF-16LE'Flink'], expr#7=[true], proj#0..1=[{exprs}], EXPR$2=[$t6], $condition=[$t7])
   +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
      +- LogicalJoin(condition=[=($3, $0)], joinType=[left])
         :- LogicalTableScan(table=[[default_catalog, default_database, source, source: [CollectionTableSource(a, b, c)]]])
         +- LogicalTableScan(table=[[default_catalog, default_database, target, source: [CollectionTableSource(a, b, c)]]])

I am sorry that the doc is out of date, I will improve it as soon as possible.

About: the merge statement for unbounded data
I think it is very meaningful to support streaming 'merge into’ in flink, but now the biggest problem
is the target table, which is used as source and sink table both. Maybe there are two ways we can go,
one is introducing the third table as the sink table, the other is breaking the data loop, WDYS ?



2022年2月18日 下午2:03,Jingsong Li <ji...@gmail.com>> 写道:

Hi zoucao,

Thanks for your proposal. I believe this discussion will take us one
step further for iceberg/hudi integration.

## `MERGE` for streaming

I feel that `MERGE` is very good for stream computing. And, this is
where our Flink can have an advantage over other computation systems.

MERGE INTO target_table USING source_table;

Under the semantics of stream computation, the incremental data of
source_table is read in real time, and these incremental data are
merged into the target_table in real time.

I think this is a great capability, and further research and
discussion is needed for detailed `MERGE INTO` streaming support.

## Calcite sql extensions

I'm not sure that calcite is good at extending its SQL syntax, a worst
case scenario is that Flink needs to maintain a version of calcite of
its own.

I would prefer that we try to push the syntax to the calcite community
as much as possible unless we have a clear solution for sql syntax
extensions.

Best,
Jingsong

On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani
<fr...@ververica.com>> wrote:

In the theory aspect, incremental data should be carefully considered for
streaming data. In this situation,  the data flow from target_table to
target_table
will be a loop, and the incremental data with one key will keep going
through
the loop. It looks very strange.

This is the same concern I have here, I don't see how MERGE can work in a
streaming scenario without modifying its preliminary assumptions and
semantics.

Even assuming we put some hard constraint on the state size, for example
requiring to specify a window definition (like in interval joins), I still
think that the fundamental assumption of MERGE here is a problem: the
target table is both a sink and a source. And I think this is a big issue,
as we cannot reasonably assume that sink and sources are available for the
same table definition or that they behave similarly.

Also, talking about the batch implementation, I don't understand how you
would implement this: from what I see in the "*validator*" paragraph of
your document, you convert the merge statement to a bunch of other sql
statements, but you omit the initial join, fundamental for the semantics of
MERGE. Perhaps can you provide more details about it?

On another note, I think we can take inspiration from MERGE and its "event
driven" semantics, in order to have something that works both for batch and
streaming, say a "Flink-ified" version of MERGE.

For example, something that I can think of could be:

PUSH TO target_table
FROM source_table
ON [window TVF]
[when_clause [...]]

Where when_clause looks like the ones from MERGE (looking at the pgsql).
This has the window TVF constraint, so the state doesn't grow indefinitely,
and the source_table is effectively any select you can think of, removing
the assumption that the target is both a sink and a source. This statement
at the end produces a changelog stream, pushed to the output table. A
statement like this could then allow you to have something similar to the
MERGE, just by replacing source_table with a select performing the join. Of
course this is an example, and might not make much sense, but I hope it
gives you the idea.

FG


On Mon, Feb 14, 2022 at 4:28 AM OpenInx <op...@gmail.com>> wrote:

I'm currently maintaining the iceberg flink modules from apache iceberg
community.

Currently, the spark has a great integration experience with iceberg format
v2 in batch mode.  In this document [1],
The merge into syntax from spark sql extensions does really help a lot when
people want to change row-level data.

We flink currently has a good integration with iceberg format v2 in
streaming mode, I mean people can export their
change log data into an iceberg table directly by writing a few sql.
This[2] is a good material to read if anybody want to
create a simple demo.

But I'd say in the batch scenarios,  we flink sql currently lack few
critical SQL syntax (for integrating iceberg format v2 in batch mode
better):
1.  ALTER TABLE to change columns.
2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
3.  MERGE INTO to merge a batch changing row set  (mixed with
insert/delete/update) into the given table.

In short, if we want to provide better integration and user experience with
iceberg v2 in batch, then I think the support of the above syntax
is very important (from iceberg perspective).

I think it's better to make that time investment at Calcite's
implementation before bringing this to Flink.

I find that there are some sql syntax which are critical for flink sql
while not for other generic sql parser.  Is it possible to implement our
flink sql plugin/extensions which
extends the core calcite sql. Going a step further, is it possible for us
to achieve a better abstraction of the flink sql framework, so that
downstream components can implement
their own customized sql plugins based on this sql framework. In this way,
it is possible to meet the needs of different components to add their own
sql implementation on top of
flink sql.

[1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
[2].

https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html


On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zh...@hotmail.com> wrote:

Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
posting
the
discussion twice. I sent the message to the dev mail group from an unsub-
scribed account,  but the message was not shown for a while, and I
guessed
that
the dev mail group would not post an email coming from an unsubscribed
account, such that I sent it again from a subscribed account.

Q: How would you see merge work for streaming data?
I think this is an interesting topic, especially for Flink, which is
wanting to unify
the streaming & batch processing. Back to the merge statement, there
exist
two inputs, target_table and source_table(query). In the merge statement,
source_table is used to correct the target_table's results and all rows
in
target_table only need to be corrected once, that's what the batch job
does.
In the theory aspect, incremental data should be carefully considered for
streaming data. In this situation,  the data flow from target_table to
target_table
will be a loop, and the incremental data with one key will keep going
through
the loop. It looks very strange. So far, we have not received any user
needs
matching the merge statement for streaming data. I think that the topic
for
data streaming should be supported by user needs and use cases before
talking about.

I really agree that we should leverage Calcite, and push calcite to
invest
it,
but now this feature does not get enough attention in calcite community.
I
found that some features for flink were also limited by calcite, such as
FLINK-21714[1], but finally was fixed in flink side. Could you teach me
how
much effort we can usually afford in a situation like this?


best,
zoucao

[1] https://issues.apache.org/jira/browse/FLINK-21714


2022年2月10日 下午4:09,Martijn Visser <martijn@ververica.com<mailto:
martijn@ververica.com>> 写道:

Hi zoucao,

I see that this message was posted twice, so I choose to only reply to
the
latest one (this one). Thanks for bringing this up for discussion.

I agree that support for a merge statement would be a welcome addition to
Flink SQL for those that are using it for bounded jobs. How would you see
merge work for streaming data?

I do think that in order for Flink to properly support this, we should
leverage Calcite for this. If there's no proper/full support for merge in
Calcite, I don't think we should add this ourselves. I think the time
investment and increase in technical debt doesn't outweigh the benefits
that this would bring to Flink. If it's really that important, I think
it's
better to make that time investment at Calcite's implementation before
bringing this to Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:

Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE
statement,
and the discussion consists of two parts. In the first part, we want to
explore and collect the cases and motivations of the MERGE statement
users.
In the second part, we want to find out the possibility for Flink SQL to
support the merge statement.

Before driving the first topic, we want to introduce the definition and
benefits of the merge statement. The MERGE statement in SQL is a very
popular clause and it can handle inserts, updates, and deletes all in a
single transaction without having to write separate logic for each of
these.
For each insert, update, or delete statement, we can specify conditions
separately. Now, many Engine/DBs have supported this feature, for
example,
SQL Server[1], Spark[2], Hive[3],  pgSQL[4].

Our use case:
Order analysis & processing is one the most important scenario, but
sometimes updated orders have a long time span compared with the last one
with the same primary key, in the meanwhile, the states for this key have
expired, such that the wrong Agg result will be achieved. In this
situation, we use the merge statement in a batch job to correct the
results, and now spark + iceberg is chosen in our internal. In the
future,
we want to unify the batch & streaming by using FlinkSQL in our internal,
it would be better if Flink could support the merge statement. If you
have
other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and
there
exists a Jira CALCITE-4338[5] to track. Could we support the merge
statement relying on the limited support from calcite-1.26.0? I wrote a
simple doc[6] to drive this, just want to find out the possibility for
Flink SQL to support the merge statement.

Looking forward to your feedback, thanks.

best,
zoucao


[1]


https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
[2]https://issues.apache.org/jira/browse/SPARK-28893
[3]


https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
[5]https://issues.apache.org/jira/browse/CALCITE-4338
[6]


https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing





Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by Jingsong Li <ji...@gmail.com>.
Hi zoucao,

Thanks for your proposal. I believe this discussion will take us one
step further for iceberg/hudi integration.

## `MERGE` for streaming

I feel that `MERGE` is very good for stream computing. And, this is
where our Flink can have an advantage over other computation systems.

MERGE INTO target_table USING source_table;

Under the semantics of stream computation, the incremental data of
source_table is read in real time, and these incremental data are
merged into the target_table in real time.

I think this is a great capability, and further research and
discussion is needed for detailed `MERGE INTO` streaming support.

## Calcite sql extensions

I'm not sure that calcite is good at extending its SQL syntax, a worst
case scenario is that Flink needs to maintain a version of calcite of
its own.

I would prefer that we try to push the syntax to the calcite community
as much as possible unless we have a clear solution for sql syntax
extensions.

Best,
Jingsong

On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani
<fr...@ververica.com> wrote:
>
> > In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange.
>
> This is the same concern I have here, I don't see how MERGE can work in a
> streaming scenario without modifying its preliminary assumptions and
> semantics.
>
> Even assuming we put some hard constraint on the state size, for example
> requiring to specify a window definition (like in interval joins), I still
> think that the fundamental assumption of MERGE here is a problem: the
> target table is both a sink and a source. And I think this is a big issue,
> as we cannot reasonably assume that sink and sources are available for the
> same table definition or that they behave similarly.
>
> Also, talking about the batch implementation, I don't understand how you
> would implement this: from what I see in the "*validator*" paragraph of
> your document, you convert the merge statement to a bunch of other sql
> statements, but you omit the initial join, fundamental for the semantics of
> MERGE. Perhaps can you provide more details about it?
>
> On another note, I think we can take inspiration from MERGE and its "event
> driven" semantics, in order to have something that works both for batch and
> streaming, say a "Flink-ified" version of MERGE.
>
> For example, something that I can think of could be:
>
> PUSH TO target_table
> FROM source_table
> ON [window TVF]
> [when_clause [...]]
>
> Where when_clause looks like the ones from MERGE (looking at the pgsql).
> This has the window TVF constraint, so the state doesn't grow indefinitely,
> and the source_table is effectively any select you can think of, removing
> the assumption that the target is both a sink and a source. This statement
> at the end produces a changelog stream, pushed to the output table. A
> statement like this could then allow you to have something similar to the
> MERGE, just by replacing source_table with a select performing the join. Of
> course this is an example, and might not make much sense, but I hope it
> gives you the idea.
>
> FG
>
>
> On Mon, Feb 14, 2022 at 4:28 AM OpenInx <op...@gmail.com> wrote:
>
> > I'm currently maintaining the iceberg flink modules from apache iceberg
> > community.
> >
> > Currently, the spark has a great integration experience with iceberg format
> > v2 in batch mode.  In this document [1],
> > The merge into syntax from spark sql extensions does really help a lot when
> > people want to change row-level data.
> >
> > We flink currently has a good integration with iceberg format v2 in
> > streaming mode, I mean people can export their
> > change log data into an iceberg table directly by writing a few sql.
> > This[2] is a good material to read if anybody want to
> > create a simple demo.
> >
> > But I'd say in the batch scenarios,  we flink sql currently lack few
> > critical SQL syntax (for integrating iceberg format v2 in batch mode
> > better):
> > 1.  ALTER TABLE to change columns.
> > 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> > 3.  MERGE INTO to merge a batch changing row set  (mixed with
> > insert/delete/update) into the given table.
> >
> > In short, if we want to provide better integration and user experience with
> > iceberg v2 in batch, then I think the support of the above syntax
> > is very important (from iceberg perspective).
> >
> > > I think it's better to make that time investment at Calcite's
> > implementation before bringing this to Flink.
> >
> > I find that there are some sql syntax which are critical for flink sql
> > while not for other generic sql parser.  Is it possible to implement our
> > flink sql plugin/extensions which
> > extends the core calcite sql. Going a step further, is it possible for us
> > to achieve a better abstraction of the flink sql framework, so that
> > downstream components can implement
> > their own customized sql plugins based on this sql framework. In this way,
> > it is possible to meet the needs of different components to add their own
> > sql implementation on top of
> > flink sql.
> >
> > [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> > [2].
> >
> > https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
> >
> >
> > On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zh...@hotmail.com> wrote:
> >
> > > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> > posting
> > > the
> > > discussion twice. I sent the message to the dev mail group from an unsub-
> > > scribed account,  but the message was not shown for a while, and I
> > guessed
> > > that
> > > the dev mail group would not post an email coming from an unsubscribed
> > > account, such that I sent it again from a subscribed account.
> > >
> > > Q: How would you see merge work for streaming data?
> > > I think this is an interesting topic, especially for Flink, which is
> > > wanting to unify
> > > the streaming & batch processing. Back to the merge statement, there
> > exist
> > > two inputs, target_table and source_table(query). In the merge statement,
> > > source_table is used to correct the target_table's results and all rows
> > in
> > > target_table only need to be corrected once, that's what the batch job
> > > does.
> > > In the theory aspect, incremental data should be carefully considered for
> > > streaming data. In this situation,  the data flow from target_table to
> > > target_table
> > > will be a loop, and the incremental data with one key will keep going
> > > through
> > > the loop. It looks very strange. So far, we have not received any user
> > > needs
> > > matching the merge statement for streaming data. I think that the topic
> > for
> > > data streaming should be supported by user needs and use cases before
> > > talking about.
> > >
> > > I really agree that we should leverage Calcite, and push calcite to
> > invest
> > > it,
> > > but now this feature does not get enough attention in calcite community.
> > I
> > > found that some features for flink were also limited by calcite, such as
> > > FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> > how
> > > much effort we can usually afford in a situation like this?
> > >
> > >
> > > best,
> > > zoucao
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-21714
> > >
> > >
> > > 2022年2月10日 下午4:09,Martijn Visser <martijn@ververica.com<mailto:
> > > martijn@ververica.com>> 写道:
> > >
> > > Hi zoucao,
> > >
> > > I see that this message was posted twice, so I choose to only reply to
> > the
> > > latest one (this one). Thanks for bringing this up for discussion.
> > >
> > > I agree that support for a merge statement would be a welcome addition to
> > > Flink SQL for those that are using it for bounded jobs. How would you see
> > > merge work for streaming data?
> > >
> > > I do think that in order for Flink to properly support this, we should
> > > leverage Calcite for this. If there's no proper/full support for merge in
> > > Calcite, I don't think we should add this ourselves. I think the time
> > > investment and increase in technical debt doesn't outweigh the benefits
> > > that this would bring to Flink. If it's really that important, I think
> > it's
> > > better to make that time investment at Calcite's implementation before
> > > bringing this to Flink.
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > >
> > >
> > > On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:
> > >
> > > Hi, devs!
> > > Jingfeng and I would like to start a discussion about the MERGE
> > statement,
> > > and the discussion consists of two parts. In the first part, we want to
> > > explore and collect the cases and motivations of the MERGE statement
> > users.
> > > In the second part, we want to find out the possibility for Flink SQL to
> > > support the merge statement.
> > >
> > > Before driving the first topic, we want to introduce the definition and
> > > benefits of the merge statement. The MERGE statement in SQL is a very
> > > popular clause and it can handle inserts, updates, and deletes all in a
> > > single transaction without having to write separate logic for each of
> > > these.
> > > For each insert, update, or delete statement, we can specify conditions
> > > separately. Now, many Engine/DBs have supported this feature, for
> > example,
> > > SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
> > >
> > > Our use case:
> > > Order analysis & processing is one the most important scenario, but
> > > sometimes updated orders have a long time span compared with the last one
> > > with the same primary key, in the meanwhile, the states for this key have
> > > expired, such that the wrong Agg result will be achieved. In this
> > > situation, we use the merge statement in a batch job to correct the
> > > results, and now spark + iceberg is chosen in our internal. In the
> > future,
> > > we want to unify the batch & streaming by using FlinkSQL in our internal,
> > > it would be better if Flink could support the merge statement. If you
> > have
> > > other use cases and opinions, plz show us here.
> > >
> > > Now, calcite does not have good support for the merge statement, and
> > there
> > > exists a Jira CALCITE-4338[5] to track. Could we support the merge
> > > statement relying on the limited support from calcite-1.26.0? I wrote a
> > > simple doc[6] to drive this, just want to find out the possibility for
> > > Flink SQL to support the merge statement.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > > best,
> > > zoucao
> > >
> > >
> > > [1]
> > >
> > >
> > https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> > > [2]https://issues.apache.org/jira/browse/SPARK-28893
> > > [3]
> > >
> > >
> > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> > > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> > > [5]https://issues.apache.org/jira/browse/CALCITE-4338
> > > [6]
> > >
> > >
> > https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
> > >
> > >
> >

Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by Francesco Guardiani <fr...@ververica.com>.
> In the theory aspect, incremental data should be carefully considered for
streaming data. In this situation,  the data flow from target_table to
target_table
will be a loop, and the incremental data with one key will keep going
through
the loop. It looks very strange.

This is the same concern I have here, I don't see how MERGE can work in a
streaming scenario without modifying its preliminary assumptions and
semantics.

Even assuming we put some hard constraint on the state size, for example
requiring to specify a window definition (like in interval joins), I still
think that the fundamental assumption of MERGE here is a problem: the
target table is both a sink and a source. And I think this is a big issue,
as we cannot reasonably assume that sink and sources are available for the
same table definition or that they behave similarly.

Also, talking about the batch implementation, I don't understand how you
would implement this: from what I see in the "*validator*" paragraph of
your document, you convert the merge statement to a bunch of other sql
statements, but you omit the initial join, fundamental for the semantics of
MERGE. Perhaps can you provide more details about it?

On another note, I think we can take inspiration from MERGE and its "event
driven" semantics, in order to have something that works both for batch and
streaming, say a "Flink-ified" version of MERGE.

For example, something that I can think of could be:

PUSH TO target_table
FROM source_table
ON [window TVF]
[when_clause [...]]

Where when_clause looks like the ones from MERGE (looking at the pgsql).
This has the window TVF constraint, so the state doesn't grow indefinitely,
and the source_table is effectively any select you can think of, removing
the assumption that the target is both a sink and a source. This statement
at the end produces a changelog stream, pushed to the output table. A
statement like this could then allow you to have something similar to the
MERGE, just by replacing source_table with a select performing the join. Of
course this is an example, and might not make much sense, but I hope it
gives you the idea.

FG


On Mon, Feb 14, 2022 at 4:28 AM OpenInx <op...@gmail.com> wrote:

> I'm currently maintaining the iceberg flink modules from apache iceberg
> community.
>
> Currently, the spark has a great integration experience with iceberg format
> v2 in batch mode.  In this document [1],
> The merge into syntax from spark sql extensions does really help a lot when
> people want to change row-level data.
>
> We flink currently has a good integration with iceberg format v2 in
> streaming mode, I mean people can export their
> change log data into an iceberg table directly by writing a few sql.
> This[2] is a good material to read if anybody want to
> create a simple demo.
>
> But I'd say in the batch scenarios,  we flink sql currently lack few
> critical SQL syntax (for integrating iceberg format v2 in batch mode
> better):
> 1.  ALTER TABLE to change columns.
> 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> 3.  MERGE INTO to merge a batch changing row set  (mixed with
> insert/delete/update) into the given table.
>
> In short, if we want to provide better integration and user experience with
> iceberg v2 in batch, then I think the support of the above syntax
> is very important (from iceberg perspective).
>
> > I think it's better to make that time investment at Calcite's
> implementation before bringing this to Flink.
>
> I find that there are some sql syntax which are critical for flink sql
> while not for other generic sql parser.  Is it possible to implement our
> flink sql plugin/extensions which
> extends the core calcite sql. Going a step further, is it possible for us
> to achieve a better abstraction of the flink sql framework, so that
> downstream components can implement
> their own customized sql plugins based on this sql framework. In this way,
> it is possible to meet the needs of different components to add their own
> sql implementation on top of
> flink sql.
>
> [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> [2].
>
> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
>
>
> On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zh...@hotmail.com> wrote:
>
> > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> posting
> > the
> > discussion twice. I sent the message to the dev mail group from an unsub-
> > scribed account,  but the message was not shown for a while, and I
> guessed
> > that
> > the dev mail group would not post an email coming from an unsubscribed
> > account, such that I sent it again from a subscribed account.
> >
> > Q: How would you see merge work for streaming data?
> > I think this is an interesting topic, especially for Flink, which is
> > wanting to unify
> > the streaming & batch processing. Back to the merge statement, there
> exist
> > two inputs, target_table and source_table(query). In the merge statement,
> > source_table is used to correct the target_table's results and all rows
> in
> > target_table only need to be corrected once, that's what the batch job
> > does.
> > In the theory aspect, incremental data should be carefully considered for
> > streaming data. In this situation,  the data flow from target_table to
> > target_table
> > will be a loop, and the incremental data with one key will keep going
> > through
> > the loop. It looks very strange. So far, we have not received any user
> > needs
> > matching the merge statement for streaming data. I think that the topic
> for
> > data streaming should be supported by user needs and use cases before
> > talking about.
> >
> > I really agree that we should leverage Calcite, and push calcite to
> invest
> > it,
> > but now this feature does not get enough attention in calcite community.
> I
> > found that some features for flink were also limited by calcite, such as
> > FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> how
> > much effort we can usually afford in a situation like this?
> >
> >
> > best,
> > zoucao
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21714
> >
> >
> > 2022年2月10日 下午4:09,Martijn Visser <martijn@ververica.com<mailto:
> > martijn@ververica.com>> 写道:
> >
> > Hi zoucao,
> >
> > I see that this message was posted twice, so I choose to only reply to
> the
> > latest one (this one). Thanks for bringing this up for discussion.
> >
> > I agree that support for a merge statement would be a welcome addition to
> > Flink SQL for those that are using it for bounded jobs. How would you see
> > merge work for streaming data?
> >
> > I do think that in order for Flink to properly support this, we should
> > leverage Calcite for this. If there's no proper/full support for merge in
> > Calcite, I don't think we should add this ourselves. I think the time
> > investment and increase in technical debt doesn't outweigh the benefits
> > that this would bring to Flink. If it's really that important, I think
> it's
> > better to make that time investment at Calcite's implementation before
> > bringing this to Flink.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> >
> >
> > On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:
> >
> > Hi, devs!
> > Jingfeng and I would like to start a discussion about the MERGE
> statement,
> > and the discussion consists of two parts. In the first part, we want to
> > explore and collect the cases and motivations of the MERGE statement
> users.
> > In the second part, we want to find out the possibility for Flink SQL to
> > support the merge statement.
> >
> > Before driving the first topic, we want to introduce the definition and
> > benefits of the merge statement. The MERGE statement in SQL is a very
> > popular clause and it can handle inserts, updates, and deletes all in a
> > single transaction without having to write separate logic for each of
> > these.
> > For each insert, update, or delete statement, we can specify conditions
> > separately. Now, many Engine/DBs have supported this feature, for
> example,
> > SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
> >
> > Our use case:
> > Order analysis & processing is one the most important scenario, but
> > sometimes updated orders have a long time span compared with the last one
> > with the same primary key, in the meanwhile, the states for this key have
> > expired, such that the wrong Agg result will be achieved. In this
> > situation, we use the merge statement in a batch job to correct the
> > results, and now spark + iceberg is chosen in our internal. In the
> future,
> > we want to unify the batch & streaming by using FlinkSQL in our internal,
> > it would be better if Flink could support the merge statement. If you
> have
> > other use cases and opinions, plz show us here.
> >
> > Now, calcite does not have good support for the merge statement, and
> there
> > exists a Jira CALCITE-4338[5] to track. Could we support the merge
> > statement relying on the limited support from calcite-1.26.0? I wrote a
> > simple doc[6] to drive this, just want to find out the possibility for
> > Flink SQL to support the merge statement.
> >
> > Looking forward to your feedback, thanks.
> >
> > best,
> > zoucao
> >
> >
> > [1]
> >
> >
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> > [2]https://issues.apache.org/jira/browse/SPARK-28893
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> > [5]https://issues.apache.org/jira/browse/CALCITE-4338
> > [6]
> >
> >
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
> >
> >
>

Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by OpenInx <op...@gmail.com>.
I'm currently maintaining the iceberg flink modules from apache iceberg
community.

Currently, the spark has a great integration experience with iceberg format
v2 in batch mode.  In this document [1],
The merge into syntax from spark sql extensions does really help a lot when
people want to change row-level data.

We flink currently has a good integration with iceberg format v2 in
streaming mode, I mean people can export their
change log data into an iceberg table directly by writing a few sql.
This[2] is a good material to read if anybody want to
create a simple demo.

But I'd say in the batch scenarios,  we flink sql currently lack few
critical SQL syntax (for integrating iceberg format v2 in batch mode
better):
1.  ALTER TABLE to change columns.
2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
3.  MERGE INTO to merge a batch changing row set  (mixed with
insert/delete/update) into the given table.

In short, if we want to provide better integration and user experience with
iceberg v2 in batch, then I think the support of the above syntax
is very important (from iceberg perspective).

> I think it's better to make that time investment at Calcite's
implementation before bringing this to Flink.

I find that there are some sql syntax which are critical for flink sql
while not for other generic sql parser.  Is it possible to implement our
flink sql plugin/extensions which
extends the core calcite sql. Going a step further, is it possible for us
to achieve a better abstraction of the flink sql framework, so that
downstream components can implement
their own customized sql plugins based on this sql framework. In this way,
it is possible to meet the needs of different components to add their own
sql implementation on top of
flink sql.

[1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
[2].
https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html


On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zh...@hotmail.com> wrote:

> Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for posting
> the
> discussion twice. I sent the message to the dev mail group from an unsub-
> scribed account,  but the message was not shown for a while, and I guessed
> that
> the dev mail group would not post an email coming from an unsubscribed
> account, such that I sent it again from a subscribed account.
>
> Q: How would you see merge work for streaming data?
> I think this is an interesting topic, especially for Flink, which is
> wanting to unify
> the streaming & batch processing. Back to the merge statement, there exist
> two inputs, target_table and source_table(query). In the merge statement,
> source_table is used to correct the target_table's results and all rows in
> target_table only need to be corrected once, that's what the batch job
> does.
> In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange. So far, we have not received any user
> needs
> matching the merge statement for streaming data. I think that the topic for
> data streaming should be supported by user needs and use cases before
> talking about.
>
> I really agree that we should leverage Calcite, and push calcite to invest
> it,
> but now this feature does not get enough attention in calcite community. I
> found that some features for flink were also limited by calcite, such as
> FLINK-21714[1], but finally was fixed in flink side. Could you teach me how
> much effort we can usually afford in a situation like this?
>
>
> best,
> zoucao
>
> [1] https://issues.apache.org/jira/browse/FLINK-21714
>
>
> 2022年2月10日 下午4:09,Martijn Visser <martijn@ververica.com<mailto:
> martijn@ververica.com>> 写道:
>
> Hi zoucao,
>
> I see that this message was posted twice, so I choose to only reply to the
> latest one (this one). Thanks for bringing this up for discussion.
>
> I agree that support for a merge statement would be a welcome addition to
> Flink SQL for those that are using it for bounded jobs. How would you see
> merge work for streaming data?
>
> I do think that in order for Flink to properly support this, we should
> leverage Calcite for this. If there's no proper/full support for merge in
> Calcite, I don't think we should add this ourselves. I think the time
> investment and increase in technical debt doesn't outweigh the benefits
> that this would bring to Flink. If it's really that important, I think it's
> better to make that time investment at Calcite's implementation before
> bringing this to Flink.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:
>
> Hi, devs!
> Jingfeng and I would like to start a discussion about the MERGE statement,
> and the discussion consists of two parts. In the first part, we want to
> explore and collect the cases and motivations of the MERGE statement users.
> In the second part, we want to find out the possibility for Flink SQL to
> support the merge statement.
>
> Before driving the first topic, we want to introduce the definition and
> benefits of the merge statement. The MERGE statement in SQL is a very
> popular clause and it can handle inserts, updates, and deletes all in a
> single transaction without having to write separate logic for each of
> these.
> For each insert, update, or delete statement, we can specify conditions
> separately. Now, many Engine/DBs have supported this feature, for example,
> SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
>
> Our use case:
> Order analysis & processing is one the most important scenario, but
> sometimes updated orders have a long time span compared with the last one
> with the same primary key, in the meanwhile, the states for this key have
> expired, such that the wrong Agg result will be achieved. In this
> situation, we use the merge statement in a batch job to correct the
> results, and now spark + iceberg is chosen in our internal. In the future,
> we want to unify the batch & streaming by using FlinkSQL in our internal,
> it would be better if Flink could support the merge statement. If you have
> other use cases and opinions, plz show us here.
>
> Now, calcite does not have good support for the merge statement, and there
> exists a Jira CALCITE-4338[5] to track. Could we support the merge
> statement relying on the limited support from calcite-1.26.0? I wrote a
> simple doc[6] to drive this, just want to find out the possibility for
> Flink SQL to support the merge statement.
>
> Looking forward to your feedback, thanks.
>
> best,
> zoucao
>
>
> [1]
>
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> [2]https://issues.apache.org/jira/browse/SPARK-28893
> [3]
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> [5]https://issues.apache.org/jira/browse/CALCITE-4338
> [6]
>
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
>
>

Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by zhou chao <zh...@hotmail.com>.
Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for posting the
discussion twice. I sent the message to the dev mail group from an unsub-
scribed account,  but the message was not shown for a while, and I guessed that
the dev mail group would not post an email coming from an unsubscribed
account, such that I sent it again from a subscribed account.

Q: How would you see merge work for streaming data?
I think this is an interesting topic, especially for Flink, which is wanting to unify
the streaming & batch processing. Back to the merge statement, there exist
two inputs, target_table and source_table(query). In the merge statement,
source_table is used to correct the target_table's results and all rows in
target_table only need to be corrected once, that's what the batch job does.
In the theory aspect, incremental data should be carefully considered for
streaming data. In this situation,  the data flow from target_table to target_table
will be a loop, and the incremental data with one key will keep going through
the loop. It looks very strange. So far, we have not received any user needs
matching the merge statement for streaming data. I think that the topic for
data streaming should be supported by user needs and use cases before
talking about.

I really agree that we should leverage Calcite, and push calcite to invest it,
but now this feature does not get enough attention in calcite community. I
found that some features for flink were also limited by calcite, such as
FLINK-21714[1], but finally was fixed in flink side. Could you teach me how
much effort we can usually afford in a situation like this?


best,
zoucao

[1] https://issues.apache.org/jira/browse/FLINK-21714


2022年2月10日 下午4:09,Martijn Visser <ma...@ververica.com>> 写道:

Hi zoucao,

I see that this message was posted twice, so I choose to only reply to the
latest one (this one). Thanks for bringing this up for discussion.

I agree that support for a merge statement would be a welcome addition to
Flink SQL for those that are using it for bounded jobs. How would you see
merge work for streaming data?

I do think that in order for Flink to properly support this, we should
leverage Calcite for this. If there's no proper/full support for merge in
Calcite, I don't think we should add this ourselves. I think the time
investment and increase in technical debt doesn't outweigh the benefits
that this would bring to Flink. If it's really that important, I think it's
better to make that time investment at Calcite's implementation before
bringing this to Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:

Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE statement,
and the discussion consists of two parts. In the first part, we want to
explore and collect the cases and motivations of the MERGE statement users.
In the second part, we want to find out the possibility for Flink SQL to
support the merge statement.

Before driving the first topic, we want to introduce the definition and
benefits of the merge statement. The MERGE statement in SQL is a very
popular clause and it can handle inserts, updates, and deletes all in a
single transaction without having to write separate logic for each of
these.
For each insert, update, or delete statement, we can specify conditions
separately. Now, many Engine/DBs have supported this feature, for example,
SQL Server[1], Spark[2], Hive[3],  pgSQL[4].

Our use case:
Order analysis & processing is one the most important scenario, but
sometimes updated orders have a long time span compared with the last one
with the same primary key, in the meanwhile, the states for this key have
expired, such that the wrong Agg result will be achieved. In this
situation, we use the merge statement in a batch job to correct the
results, and now spark + iceberg is chosen in our internal. In the future,
we want to unify the batch & streaming by using FlinkSQL in our internal,
it would be better if Flink could support the merge statement. If you have
other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and there
exists a Jira CALCITE-4338[5] to track. Could we support the merge
statement relying on the limited support from calcite-1.26.0? I wrote a
simple doc[6] to drive this, just want to find out the possibility for
Flink SQL to support the merge statement.

Looking forward to your feedback, thanks.

best,
zoucao


[1]
https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
[2]https://issues.apache.org/jira/browse/SPARK-28893
[3]
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
[5]https://issues.apache.org/jira/browse/CALCITE-4338
[6]
https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing


Re: [DISCUSS]Support the merge statement in FlinkSQL

Posted by Martijn Visser <ma...@ververica.com>.
Hi zoucao,

I see that this message was posted twice, so I choose to only reply to the
latest one (this one). Thanks for bringing this up for discussion.

I agree that support for a merge statement would be a welcome addition to
Flink SQL for those that are using it for bounded jobs. How would you see
merge work for streaming data?

I do think that in order for Flink to properly support this, we should
leverage Calcite for this. If there's no proper/full support for merge in
Calcite, I don't think we should add this ourselves. I think the time
investment and increase in technical debt doesn't outweigh the benefits
that this would bring to Flink. If it's really that important, I think it's
better to make that time investment at Calcite's implementation before
bringing this to Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Wed, 9 Feb 2022 at 08:40, zhou chao <zh...@hotmail.com> wrote:

> Hi, devs!
> Jingfeng and I would like to start a discussion about the MERGE statement,
> and the discussion consists of two parts. In the first part, we want to
> explore and collect the cases and motivations of the MERGE statement users.
> In the second part, we want to find out the possibility for Flink SQL to
> support the merge statement.
>
> Before driving the first topic, we want to introduce the definition and
> benefits of the merge statement. The MERGE statement in SQL is a very
> popular clause and it can handle inserts, updates, and deletes all in a
> single transaction without having to write separate logic for each of
> these.
> For each insert, update, or delete statement, we can specify conditions
> separately. Now, many Engine/DBs have supported this feature, for example,
> SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
>
> Our use case:
> Order analysis & processing is one the most important scenario, but
> sometimes updated orders have a long time span compared with the last one
> with the same primary key, in the meanwhile, the states for this key have
> expired, such that the wrong Agg result will be achieved. In this
> situation, we use the merge statement in a batch job to correct the
> results, and now spark + iceberg is chosen in our internal. In the future,
> we want to unify the batch & streaming by using FlinkSQL in our internal,
> it would be better if Flink could support the merge statement. If you have
> other use cases and opinions, plz show us here.
>
> Now, calcite does not have good support for the merge statement, and there
> exists a Jira CALCITE-4338[5] to track. Could we support the merge
> statement relying on the limited support from calcite-1.26.0? I wrote a
> simple doc[6] to drive this, just want to find out the possibility for
> Flink SQL to support the merge statement.
>
> Looking forward to your feedback, thanks.
>
> best,
> zoucao
>
>
> [1]
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> [2]https://issues.apache.org/jira/browse/SPARK-28893
> [3]
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> [5]https://issues.apache.org/jira/browse/CALCITE-4338
> [6]
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing