You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/02/03 13:09:12 UTC

DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Hi,
I'm reworking an existing UpsertStreamTableSink into the new
DynamicTableSink API. In the previous API, one would get the unique keys
for upsert queries via the `setKeyFields` method, which would calculate
them based on the grouping keys during the translation phase.

Searching around, I saw that JDBC (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling)
relies on explicit key passing via the PRIMARY KEY constraint. However,
this would require additional manual insertion which I am trying to avoid.

What would be the proper way to receive the unique keys for upsert queries
with the new DynamicTableSink API?

-- 
Best Regards,
Yuval Itzchakov.

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Posted by Timo Walther <tw...@apache.org>.
For this you might need to go a level deeper.

Maybe the legacy util 
org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker can help 
you. It analyzes the plan to figure out the keys. 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdUniqueKeys seems 
the newer version.

Regards,
Timo

On 03.02.21 16:24, Yuval Itzchakov wrote:
> Hi Timo,
> 
> The problem with this is I would still have to determine the keys 
> manually, which is not really feasible in my case. Is there any internal 
> API that might be of use to extract this information?
> 
> On Wed, Feb 3, 2021 at 5:19 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Yuval,
> 
>     we changed this behavior a bit to be more SQL compliant. Currently,
>     sinks must be explicitly defined with a PRIMARY KEY constraint. We had
>     discussions about implicit sinks, but nothing on the roadmap yet. The
>     `CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
>     original table with just a primary key.
> 
>     Regards,
>     Timo
> 
> 
>     On 03.02.21 14:09, Yuval Itzchakov wrote:
>      > Hi,
>      > I'm reworking an existing UpsertStreamTableSink into the new
>      > DynamicTableSink API. In the previous API, one would get the
>     unique keys
>      > for upsert queries via the `setKeyFields` method, which would
>     calculate
>      > them based on the grouping keys during the translation phase.
>      >
>      > Searching around, I saw that JDBC
>      >
>     (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>
> 
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>>)
> 
>      > relies on explicit key passing via the PRIMARY KEY constraint.
>     However,
>      > this would require additional manual insertion which I am trying
>     to avoid.
>      >
>      > What would be the proper way to receive the unique keys for upsert
>      > queries with the new DynamicTableSink API?
>      >
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.


Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Timo,

The problem with this is I would still have to determine the keys manually,
which is not really feasible in my case. Is there any internal API that
might be of use to extract this information?

On Wed, Feb 3, 2021 at 5:19 PM Timo Walther <tw...@apache.org> wrote:

> Hi Yuval,
>
> we changed this behavior a bit to be more SQL compliant. Currently,
> sinks must be explicitly defined with a PRIMARY KEY constraint. We had
> discussions about implicit sinks, but nothing on the roadmap yet. The
> `CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
> original table with just a primary key.
>
> Regards,
> Timo
>
>
> On 03.02.21 14:09, Yuval Itzchakov wrote:
> > Hi,
> > I'm reworking an existing UpsertStreamTableSink into the new
> > DynamicTableSink API. In the previous API, one would get the unique keys
> > for upsert queries via the `setKeyFields` method, which would calculate
> > them based on the grouping keys during the translation phase.
> >
> > Searching around, I saw that JDBC
> > (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>)
>
> > relies on explicit key passing via the PRIMARY KEY constraint. However,
> > this would require additional manual insertion which I am trying to
> avoid.
> >
> > What would be the proper way to receive the unique keys for upsert
> > queries with the new DynamicTableSink API?
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Posted by Timo Walther <tw...@apache.org>.
Hi Yuval,

we changed this behavior a bit to be more SQL compliant. Currently, 
sinks must be explicitly defined with a PRIMARY KEY constraint. We had 
discussions about implicit sinks, but nothing on the roadmap yet. The 
`CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the 
original table with just a primary key.

Regards,
Timo


On 03.02.21 14:09, Yuval Itzchakov wrote:
> Hi,
> I'm reworking an existing UpsertStreamTableSink into the new 
> DynamicTableSink API. In the previous API, one would get the unique keys 
> for upsert queries via the `setKeyFields` method, which would calculate 
> them based on the grouping keys during the translation phase.
> 
> Searching around, I saw that JDBC 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>) 
> relies on explicit key passing via the PRIMARY KEY constraint. However, 
> this would require additional manual insertion which I am trying to avoid.
> 
> What would be the proper way to receive the unique keys for upsert 
> queries with the new DynamicTableSink API?
> 
> -- 
> Best Regards,
> Yuval Itzchakov.