You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2023/02/17 07:56:51 UTC

[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink Function的invoke方法打的日志),该行为导致最终结果表数据不正确。


请问:
flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
我理解flink sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。


Re:Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by "casel.chen" <ca...@126.com>.





你说的这个在写入之前进行shuffle(先执行一个group by主键)这个操作我认为应该是Flink框架层面的事情,不应该在作业层面显式添加。
Flink框架应该在执行sink的时候判断目标表是否有主键,如果有主键的话应该插入一个group by算子将相同主键的记录发到同一个TaskManager处理。
我听说 Flink新版本1.15还是1.16不记得了已经改进了这个问题,有谁知道吗?有相关issue或PR链接没?











在 2023-02-19 13:43:29,"RS" <ti...@163.com> 写道:
>Hi,
>connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
>Thanks
>
>
>
>在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
>>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>>
>>
>>请问:
>>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>>我理解flink sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>>

Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by "casel.chen" <ca...@126.com>.
Flink SQL作业示意如下:


create table user_source_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_id BIGINT NOT NULL,
  proctime AS PROCTIME()
) with (
 'connector' = 'kafka', 
 'format' = 'canal-json',
 ...
);


create table department_dim_table (
   id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
   name STRING
) with (
 'connector' = 'jdbc',
 ...
);


create table user_rich_sink_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_name STRING
) with (
 'connector' = 'jdbc'
 ...
);


insert into user_rich_sink_table 
select id, name, d.name as dept_name 
from user_source_table u
  left join department_dim_table for system_time as of u.proctime as d 
  on u.dept_id = d.id;


用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert?
现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。





在 2023-02-20 08:41:20,"Shammon FY" <zj...@gmail.com> 写道:
>Hi
>
>如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
>Best,
>Shammon
>
>
>On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
>
>> Hi,
>> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>>
>>
>> Thanks
>>
>>
>>
>> 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
>> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >
>> >
>> >请问:
>> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >我理解flink
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >
>>

Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by Shuo Cheng <nj...@gmail.com>.
> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?

Sink upsert materialize would be applied in the following circumstances:
1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key
nonempty.
2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key
doesn't contain upsert keys of the input update stream.

Note: upsert materializing operator use state to resolve disorder problems
which may incur additional performance regression.

Best,
Shuo

On Fri, Feb 24, 2023 at 10:02 AM casel.chen <ca...@126.com> wrote:

> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
>
>
> Because of the disorder of ChangeLog data caused by Shuffle in distributed
> system, the data received by Sink may not be the order of global upsert. So
> add upsert materialize operator before upsert sink. It receives the
> upstream changelog records and generate an upsert view for the downstream.
> By default, the materialize operator will be added when a distributed
> disorder occurs on unique keys. You can also choose no
> materialization(NONE) or force materialization(FORCE).
>
> Possible values:
> "NONE"
> "AUTO"
> "FORCE"
>
>
> public static final ConfigOption<UpsertMaterialize>
> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
>             key("table.exec.sink.upsert-materialize")
>                     .enumType(UpsertMaterialize.class)
>                     .defaultValue(UpsertMaterialize.AUTO)
>                     .withDescription(
>                             Description.builder()
>                                     .text(
>                                             "Because of the disorder of
> ChangeLog data caused by Shuffle in distributed system, "
>                                                     + "the data received
> by Sink may not be the order of global upsert. "
>                                                     + "So add upsert
> materialize operator before upsert sink. It receives the "
>                                                     + "upstream changelog
> records and generate an upsert view for the downstream.")
>                                     .linebreak()
>                                     .text(
>                                             "By default, the materialize
> operator will be added when a distributed disorder "
>                                                     + "occurs on unique
> keys. You can also choose no materialization(NONE) "
>                                                     + "or force
> materialization(FORCE).")
>                                     .build());
>
>
>
>
>
> 在 2023-02-22 15:34:27,"Shuo Cheng" <nj...@gmail.com> 写道:
> >Hi,
> >
> >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
> >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details
> about
> >solution of disordering problems in KeyBy shuffling.
> >
> >Best,
> >Shuo
> >
> >On Wed, Feb 22, 2023 at 10:23 AM casel.chen <ca...@126.com> wrote:
> >
> >>
> >>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
> >>
> >>
> >> 在 2023-02-20 09:50:50,"Shengkai Fang" <fs...@gmail.com> 写道:
> >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >[1]
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >> >
> >> >Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:
> >> >
> >> >> Hi
> >> >>
> >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >> >>
> >> >> Best,
> >> >> Shammon
> >> >>
> >> >>
> >> >> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
> >> >>
> >> >> > Hi,
> >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by
> 主键,然后再执行insert
> >> into
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> >
> >> >> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
> >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> >> > >
> >> >> > >
> >> >> > >请问:
> >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> >> > >我理解flink
> >> >> >
> >> >>
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> >> > >
> >> >> >
> >> >>
> >>
>

Re:Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by "casel.chen" <ca...@126.com>.
你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force? 


Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.
By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).

Possible values:
"NONE"
"AUTO"
"FORCE"


public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
            key("table.exec.sink.upsert-materialize")
                    .enumType(UpsertMaterialize.class)
                    .defaultValue(UpsertMaterialize.AUTO)
                    .withDescription(
                            Description.builder()
                                    .text(
                                            "Because of the disorder of ChangeLog data caused by Shuffle in distributed system, "
                                                    + "the data received by Sink may not be the order of global upsert. "
                                                    + "So add upsert materialize operator before upsert sink. It receives the "
                                                    + "upstream changelog records and generate an upsert view for the downstream.")
                                    .linebreak()
                                    .text(
                                            "By default, the materialize operator will be added when a distributed disorder "
                                                    + "occurs on unique keys. You can also choose no materialization(NONE) "
                                                    + "or force materialization(FORCE).")
                                    .build());





在 2023-02-22 15:34:27,"Shuo Cheng" <nj...@gmail.com> 写道:
>Hi,
>
>Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
>ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
>solution of disordering problems in KeyBy shuffling.
>
>Best,
>Shuo
>
>On Wed, Feb 22, 2023 at 10:23 AM casel.chen <ca...@126.com> wrote:
>
>>
>> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>>
>>
>> 在 2023-02-20 09:50:50,"Shengkai Fang" <fs...@gmail.com> 写道:
>> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>> >
>> >Best,
>> >Shengkai
>> >
>> >[1]
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>> >
>> >Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:
>> >
>> >> Hi
>> >>
>> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>> >>
>> >> Best,
>> >> Shammon
>> >>
>> >>
>> >> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
>> >>
>> >> > Hi,
>> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
>> into
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> >
>> >> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
>> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >> > >
>> >> > >
>> >> > >请问:
>> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >> > >我理解flink
>> >> >
>> >>
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >> > >
>> >> >
>> >>
>>

Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by Shuo Cheng <nj...@gmail.com>.
Hi,

Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
solution of disordering problems in KeyBy shuffling.

Best,
Shuo

On Wed, Feb 22, 2023 at 10:23 AM casel.chen <ca...@126.com> wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang" <fs...@gmail.com> 写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>

Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by Weihua Hu <hu...@gmail.com>.
如果想保证每次写入 mysql 的事件是最新的,需要在 Flink 内部针对事件时间排序取 TOP 1, 可以参考[1]。 但是需要注意这需要使用
state,你可以需要指定合适的 TTL[2] 来保证 state 不会过大

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-state-ttl

Best,
Weihua


On Wed, Feb 22, 2023 at 10:23 AM casel.chen <ca...@126.com> wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang" <fs...@gmail.com> 写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>

Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by "casel.chen" <ca...@126.com>.
如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?


在 2023-02-20 09:50:50,"Shengkai Fang" <fs...@gmail.com> 写道:
>我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>
>Best,
>Shengkai
>
>[1]
>https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>
>Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:
>
>> Hi
>>
>> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>>
>> Best,
>> Shammon
>>
>>
>> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
>>
>> > Hi,
>> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
>> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> > >
>> > >
>> > >请问:
>> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> > >我理解flink
>> >
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> > >
>> >
>>

Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by Shengkai Fang <fs...@gmail.com>.
我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。

Best,
Shengkai

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188

Shammon FY <zj...@gmail.com> 于2023年2月20日周一 08:41写道:

> Hi
>
> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
> Best,
> Shammon
>
>
> On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:
>
> > Hi,
> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
> >
> >
> > Thanks
> >
> >
> >
> > 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> > >
> > >
> > >请问:
> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> > >我理解flink
> >
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> > >
> >
>

Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by Shammon FY <zj...@gmail.com>.
Hi

如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作

Best,
Shammon


On Sun, Feb 19, 2023 at 1:43 PM RS <ti...@163.com> wrote:

> Hi,
> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
> Thanks
>
>
>
> 在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >
> >
> >请问:
> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >我理解flink
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >
>

Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

Posted by RS <ti...@163.com>.
Hi,
connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into


Thanks



在 2023-02-17 15:56:51,"casel.chen" <ca...@126.com> 写道:
>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>
>
>请问:
>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>我理解flink sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>