You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by WeiXubin <18...@163.com> on 2021/06/08 09:19:01 UTC
FlinkSQL cannot update pk column UID to expr
基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。
Flink 版本 1.12.2
场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出
场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key
场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
则报错,主要报错信息:
java.sql.BatchUpdateException: [30000,
2021060816420117201616500303151172567] cannot update pk column UID to expr
注:此处使用的MySQL 是阿里的 ADB,建表SQL如下
Create Table `v2_dwd_root_game_uid_reg_log` (
`uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid',
`user_name` varchar NOT NULL DEFAULT '',
// 此处省略其他字段
primary key (`uid`,`platform`,`root_game_id`)
) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT'
COMMENT='按根游戏账号注册日志';
下面是场景3的SQL语句:
// Kafka Source
CREATE TABLE KafkaTable (
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'xxxxxxxxxxx',
'properties.bootstrap.servers' = 'xxxxxxxxxxx',
'properties.group.id' = 'xxxxxxxxxxxxx',
'scan.startup.mode' = 'group-offsets',
'format' = 'json'
);
// 维表
CREATE TABLE DimTable (
game_id BIGINT,
root_game_id BIGINT,
main_game_id BIGINT,
platform VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'xxxxxxxxxxxx',
'table-name' = 'v2_dim_game_id',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxxxxxxxx',
'password' = 'xxxxxxxx',
'lookup.cache.max-rows'='5000',
'lookup.cache.ttl' = '60s',
'lookup.max-retries'='3'
);
// MySQL输出
CREATE TABLE sinktable (
uid BIGINT,
root_game_id BIGINT,
game_id BIGINT,
platform VARCHAR,
//....省略其它字段
PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'xxxxxxxxxxxxxx',
'table-name' = 'v2_dwd_root_game_uid_reg_log',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxxxxxxxxxx',
'password' = 'xxxxxxxxxxxx',
'sink.buffer-flush.interval'='5s',
'sink.buffer-flush.max-rows' = '10'
);
// 插入(关联维表)
INSERT INTO sinktable select
IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid,
IF((k.game_id IS NULL), 0 , k.game_id) as game_id,
d.platform as platform,
d.root_game_id as root_game_id,
// 省略其它字段
from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform =
d.platform;
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL cannot update pk column UID to expr
Posted by WeiXubin <18...@163.com>.
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(`uid`)
所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了
DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL cannot update pk column UID to expr
Posted by WeiXubin <18...@163.com>.
详细的异常打印信息如下:
java.sql.BatchUpdateException: [30000,
2021060816420017201616500303151172306] cannot update pk column UID to expr
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
at com.mysql.cj.util.Util.getInstance(Util.java:167)
at com.mysql.cj.util.Util.getInstance(Util.java:174)
at
com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
at
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
--
Sent from: http://apache-flink.147419.n8.nabble.com/