You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jindy_liu <28...@qq.com> on 2020/11/13 05:12:26 UTC

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

输出表
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '100000',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);


输出语句:
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据
test: 
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.....

status
0, status0
1, status1
2, status2
.....

操作顺序与复现:
1、启动任务,设置并行度为40,
表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
savepoint保存,然后web ui上取消任务。
  ==> test_status中的数据正常:
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1
    2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
job  下,
  ==> test_status中的数据正常:
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1_modify
    2, name2, 2020-07-06 00:00:00 , 1, status1_modify
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
job  下
  ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
    0, name0, 2020-07-06 00:00:00 , 0, status0


怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
如果是,能不能在sink的时候,只把sink这里的并行度设置为1??







--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
图片png格式,怕看不了,我文字补充下:
1、print的最后几行。

32> +I(1999991,jindy1999991,2020-07-03T18:04:22,0,statu0)
32> +I(1999992,jindy1999992,2020-07-03T18:04:22,0,statu0)
32> +I(1999993,jindy1999993,2020-07-03T18:04:22,0,statu0)
32> +I(1999994,jindy1999994,2020-07-03T18:04:22,0,statu0)
32> +I(1999995,jindy1999995,2020-07-03T18:04:22,0,statu0)
32> +I(1999996,jindy1999996,2020-07-03T18:04:22,0,statu0)
32> +I(1999997,jindy1999997,2020-07-03T18:04:22,0,statu0)
32> +I(1999998,jindy1999998,2020-07-03T18:04:22,0,statu0)
32> +I(1999999,jindy1999999,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!

同时修下笔误:===>
snapshot后 test_status中的数据正常:
    0, jindy0, 2020-07-06T20:01:15 , 0, statu0
    1, jindy2, 2020-11-12T00:00:02 , 1, statu1
    2, jindy2, 2020-07-03T18:04:22 , 2, statu2




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
谢谢jark!这几天一直在做性能调优!
    1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join
key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink
sql cdc方面有啥建议吗?

    2、关于性能这块,确实flink的rocksdb默认参数,性能很差!
按你给的文章,调了些参数,同时换了ssd硬盘后,write_buffer,buffter_size,能有很好的提升。我说之前怎么并行度提高了,cpu感觉总是跑不满,在等io了。感觉这里提升空间还有很大,还没摸到窍门,哪个参数会比较好。
    
    3、另外,性能监控方面,flink的web
ui上的metric有点难用,有没有一些prometheus+grafana的最佳实践的?指标有点多,dashboard搞起来很麻烦,
主要是要有dashboard的配置!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
Btw, I created an issue to track this problem:
https://issues.apache.org/jira/browse/FLINK-20374
Hope we can fix it in the next versions to have a better out-of-box
experience.

Best,
Jark

On Thu, 19 Nov 2020 at 13:58, Jark Wu <im...@gmail.com> wrote:

> 如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
> 1. 是否有使用 SSD?
> 2. 调整 write buffer 和 block cache
> 3. 更多可以看下这些 state 调优文章[1][2].
>
> Best,
> Jark
>
> [1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
> [2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
>
> On Thu, 19 Nov 2020 at 12:19, jindy_liu <28...@qq.com> wrote:
>
>> 很感谢jark!
>> 1、昨天将status表设置成时态表(Temporal
>> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>>
>> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>>
>> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>>
>> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
>> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
>> status2, ....test 2000000 -- status2000000),source仍然存在着反压,只是并发度高的反压慢点出现一些,
>> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>>
>> 这个数据反压上,jark你有啥建议吗?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
1. 是否有使用 SSD?
2. 调整 write buffer 和 block cache
3. 更多可以看下这些 state 调优文章[1][2].

Best,
Jark

[1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
[2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

On Thu, 19 Nov 2020 at 12:19, jindy_liu <28...@qq.com> wrote:

> 很感谢jark!
> 1、昨天将status表设置成时态表(Temporal
> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>
> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>
> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>
> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
> status2, ....test 2000000 -- status2000000),source仍然存在着反压,只是并发度高的反压慢点出现一些,
> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>
> 这个数据反压上,jark你有啥建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
很感谢jark!
1、昨天将status表设置成时态表(Temporal
Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。

2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。

3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
status2, ....test 2000000 -- status2000000),source仍然存在着反压,只是并发度高的反压慢点出现一些,
这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?

这个数据反压上,jark你有啥建议吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
我再仔细看了下你的问题,你的 join key 是 status id,所以目前会按照 status id 做 shuffle key 分发给
join 的不同并发处理。
如果 test 表的 status id 发生变更的话,就会导致一个 test  id 的数据会被不同的 join 并发处理,也即 test
数据已经乱序了,
这时候,即使下游再加 keyby sink key,也无济于事了。

所以,如果双流 join 两个 cdc 流,要注意 join key 是不能发生变更的,否则只能 join 设置成单并发。
像你这个场景,可以考虑采用维表 join status 表,因为目前维表 join 不会按照 join key 做 shuffle,所以能保证即使
test 表数据不乱序。
但是 status 表的更新,就无法触发计算 更新到sink 表了,只有 test 表的更新 才会触发计算并更新到 sink 表。

Best,
Jark



On Mon, 16 Nov 2020 at 16:03, jindy_liu <28...@qq.com> wrote:

> 1、试了下
>
> 在test表中增加一个proctime
>
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `proctime` AS PROCTIME(),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'no_lock',
>   'password' = 'no_lock',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test',
>   'debezium.snapshot.locking.mode' = 'none'
> );
>
> 写去重语句,
>
> INSERT into test_status_print
> SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
> FROM (
>         SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as
> rowNum
>         FROM (
>                 SELECT t.* , s.name as status_name
>                 FROM test AS t
>                 LEFT JOIN status AS s ON t.status = s.id
>         )
> )r WHERE rowNum = 1;
>
> 但提示报错,不支持:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Deduplicate doesn't support
> consuming update and delete changes which is produced by node
> Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
> time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
1、试了下

在test表中增加一个proctime 

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'no_lock',
  'password' = 'no_lock',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test',
  'debezium.snapshot.locking.mode' = 'none'
);

写去重语句,

INSERT into test_status_print 
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
	SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as rowNum
	FROM (
		SELECT t.* , s.name as status_name
		FROM test AS t
		LEFT JOIN status AS s ON t.status = s.id
	)
)r WHERE rowNum = 1;

但提示报错,不支持:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
1.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
2. 这是1.12 的功能,定义在 sink DDL with 属性里的。

On Mon, 16 Nov 2020 at 14:18, jindy_liu <28...@qq.com> wrote:

> 哦,这样啊
> 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
> 2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
哦,这样啊
1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
如果你是改了test表上的 status 关联字段,那么是会出现这个现象的。你一开始的 example 不是改 status 字段的。

这个问题的本质是 join key 和你最终的 sink key 不一致,导致可能出现乱序。
这个只需要在 sink 前显式按照 sink key shuffle 应该就能解决,比如加上一个 deduplicate by sink key 节点。
或者在 1.12 版本中,只需要 sink 并发与前面节点的并发不一样,框架也会自动加上一个 sink key shuffle。

关于你说的 join 节点热点问题,那是因为你的 status key 太少了,导致数据倾斜严重。





On Mon, 16 Nov 2020 at 12:03, jindy_liu <28...@qq.com> wrote:

> 怕图片看不清,
> 我文字补充下:
> 1、print的最后几行。
>
> 32> +I(1999991,jindy1999991,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999992,jindy1999992,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999993,jindy1999993,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999994,jindy1999994,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999995,jindy1999995,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999996,jindy1999996,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999997,jindy1999997,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999998,jindy1999998,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999999,jindy1999999,2020-07-03T18:04:22,0,statu0)
> 36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
> 32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
> 30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
> 36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
> 36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
> 30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)
>
> 2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
怕图片看不清,
我文字补充下:
1、print的最后几行。

32> +I(1999991,jindy1999991,2020-07-03T18:04:22,0,statu0)
32> +I(1999992,jindy1999992,2020-07-03T18:04:22,0,statu0)
32> +I(1999993,jindy1999993,2020-07-03T18:04:22,0,statu0)
32> +I(1999994,jindy1999994,2020-07-03T18:04:22,0,statu0)
32> +I(1999995,jindy1999995,2020-07-03T18:04:22,0,statu0)
32> +I(1999996,jindy1999996,2020-07-03T18:04:22,0,statu0)
32> +I(1999997,jindy1999997,2020-07-03T18:04:22,0,statu0)
32> +I(1999998,jindy1999998,2020-07-03T18:04:22,0,statu0)
32> +I(1999999,jindy1999999,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
图片是屏幕截图,png格式的。忘记加后缀了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by jindy_liu <28...@qq.com>.
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。

1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh
embedded -d 
/data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml
sql-client-defaults.yaml的并行度设置为40.

数据一样,其中test表规模是200w条,status表11条。

源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

//输出
CREATE TABLE test_status_print (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
	'connector' = 'print'
);

//联接
INSERT into test_status_print 
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

复现操作,在mysql-cdc snapshot结束后,改test 表中的status字段,会出现顺序问题。我用print打印了。
snapshot后 test_status中的数据正常:
    0, jindy0, 2020-07-06T20:01:15 , 0, statu0
    1, jindy2, 2020-11-12T00:00:02 , 1, statu2
    2, jindy2, 2020-07-03T18:04:22 , 2, statu3

snapshot后,将mysql表中记录id=0,1,2的行中的status值改为3,预期结果
    0, jindy0, 2020-07-06T20:01:15 , 3, statu3
    1, jindy2, 2020-11-12T00:00:02 , 3, statu3
    2, jindy2, 2020-07-03T18:04:22 , 3, statu3
但输出顺序上有问题,会导致test_status表中的id=0,2两条记录丢失。

1、print输出:
<http://apache-flink.147419.n8.nabble.com/file/t670/1.1> 


ps:
另外观察到另外一个问题是:source数据送到join算子里,好像没啥hash能力,基本都挤在了一个结点上处理了?为啥会这样?感觉这样join算子会是瓶颈!!!很容易反压?!
<http://apache-flink.147419.n8.nabble.com/file/t670/2.2> 

@jark,帮忙看看,我的版本是Version: 1.11.2 Commit: fe36135 @
2020-09-09T16:19:03+02:00,官网下载的 ?







--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。

不过从理论分析,不应该出现这个现象。
我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。
是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号..

Best,
Jark

On Sat, 14 Nov 2020 at 17:48, Jark Wu <im...@gmail.com> wrote:

> 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
> 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
> 这个bug 会在即将发布的 1.11.3 中修复。
>
> Best,
> Jark
>
>
>
>
> On Fri, 13 Nov 2020 at 13:12, jindy_liu <28...@qq.com> wrote:
>
>> 源表test:
>> CREATE TABLE test (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'test'
>> )
>> 源表status
>> CREATE TABLE status (
>> `id` INT,
>> `name` VARCHAR(255),
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'status'
>> );
>>
>> 输出表
>> CREATE TABLE test_status (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> `status_name` VARCHAR(255)
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'xxx',
>>   'index' = 'xxx',
>>   'username' = 'xxx',
>>   'password' = 'xxx',
>>   'sink.bulk-flush.backoff.max-retries' = '100000',
>>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>>   'sink.bulk-flush.max-actions' = '5000',
>>   'sink.bulk-flush.max-size' = '10mb',
>>   'sink.bulk-flush.interval' = '1s'
>> );
>>
>>
>> 输出语句:
>> INSERT into test_status
>> SELECT t.*, s.name
>> FROM test AS t
>> LEFT JOIN status AS s ON t.status = s.id;
>>
>> mysql表中已经有数据
>> test:
>> 0, name0, 2020-07-06 00:00:00 , 0
>> 1, name1, 2020-07-06 00:00:00 , 1
>> 2, name2, 2020-07-06 00:00:00 , 1
>> .....
>>
>> status
>> 0, status0
>> 1, status1
>> 2, status2
>> .....
>>
>> 操作顺序与复现:
>> 1、启动任务,设置并行度为40,
>> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
>> savepoint保存,然后web ui上取消任务。
>>   ==> test_status中的数据正常:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>     1, name1, 2020-07-06 00:00:00 , 1, status1
>>     2, name2, 2020-07-06 00:00:00 , 1, status1
>>
>> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>>
>> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
>> job  下,
>>   ==> test_status中的数据正常:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>     1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>>     2, name2, 2020-07-06 00:00:00 , 1, status1_modify
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
>> job  下
>>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>
>>
>> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>>
>> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
>> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

Posted by Jark Wu <im...@gmail.com>.
看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
这个bug 会在即将发布的 1.11.3 中修复。

Best,
Jark




On Fri, 13 Nov 2020 at 13:12, jindy_liu <28...@qq.com> wrote:

> 源表test:
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> 源表status
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
>
> 输出表
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '100000',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
>
>
> 输出语句:
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
>
> mysql表中已经有数据
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
>
> status
> 0, status0
> 1, status1
> 2, status2
> .....
>
> 操作顺序与复现:
> 1、启动任务,设置并行度为40,
> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
> savepoint保存,然后web ui上取消任务。
>   ==> test_status中的数据正常:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1
>     2, name2, 2020-07-06 00:00:00 , 1, status1
>
> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>
> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
> job  下,
>   ==> test_status中的数据正常:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>     2, name2, 2020-07-06 00:00:00 , 1, status1_modify
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
> job  下
>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>
>
> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>
> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>