You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by WeiXubin <18...@163.com> on 2021/06/08 09:19:01 UTC

FlinkSQL cannot update pk column UID to expr

基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable  维表。
Flink 版本 1.12.2

场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出
场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key
场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
则报错,主要报错信息:
java.sql.BatchUpdateException: [30000,
2021060816420117201616500303151172567] cannot update pk column UID to expr

注:此处使用的MySQL 是阿里的 ADB,建表SQL如下
Create Table `v2_dwd_root_game_uid_reg_log` (
 `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid',
 `user_name` varchar NOT NULL DEFAULT '',
  // 此处省略其他字段
 primary key (`uid`,`platform`,`root_game_id`)
) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT'
COMMENT='按根游戏账号注册日志';


下面是场景3的SQL语句:
// Kafka Source
CREATE TABLE KafkaTable (
    message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'xxxxxxxxxxx',
    'properties.bootstrap.servers' = 'xxxxxxxxxxx',
    'properties.group.id' = 'xxxxxxxxxxxxx',
    'scan.startup.mode' = 'group-offsets',
    'format' = 'json'
);

// 维表
CREATE TABLE DimTable (
    game_id BIGINT,
    root_game_id BIGINT,
    main_game_id BIGINT,
    platform VARCHAR
) WITH (
    'connector' = 'jdbc',
    'url' = 'xxxxxxxxxxxx',
    'table-name' = 'v2_dim_game_id',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'xxxxxxxxx',
    'password' = 'xxxxxxxx',
    'lookup.cache.max-rows'='5000',
    'lookup.cache.ttl' = '60s',
    'lookup.max-retries'='3'
);

// MySQL输出
CREATE TABLE sinktable (
    uid BIGINT,
    root_game_id BIGINT,
    game_id BIGINT,
    platform VARCHAR,
    //....省略其它字段
    PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'xxxxxxxxxxxxxx',
    'table-name' = 'v2_dwd_root_game_uid_reg_log',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'xxxxxxxxxxx',
    'password' = 'xxxxxxxxxxxx',
    'sink.buffer-flush.interval'='5s',
    'sink.buffer-flush.max-rows' = '10'
);

// 插入(关联维表)
INSERT INTO sinktable select
    IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid,
    IF((k.game_id IS NULL), 0 , k.game_id) as game_id,
    d.platform as platform,
    d.root_game_id as root_game_id,
    // 省略其它字段
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform =
d.platform;




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL cannot update pk column UID to expr

Posted by WeiXubin <18...@163.com>.
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的  DISTRIBUTE BY HASH(`uid`)
所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了
DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL cannot update pk column UID to expr

Posted by WeiXubin <18...@163.com>.
详细的异常打印信息如下:

java.sql.BatchUpdateException: [30000,
2021060816420017201616500303151172306] cannot update pk column UID to expr
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at
com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
	at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
	at
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
	at
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
	at
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at
java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink.147419.n8.nabble.com/