You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Carl (Jira)" <ji...@apache.org> on 2021/04/28 03:36:00 UTC
[jira] [Created] (FLINK-22498) cast the primary key for source
table that has a decimal primary key as string, and then insert into a kudu
table that has a string primary key throw the exception :
UpsertStreamTableSink requires that Table has a full primary keys if it is
updated
Carl created FLINK-22498:
----------------------------
Summary: cast the primary key for source table that has a decimal primary key as string, and then insert into a kudu table that has a string primary key throw the exception : UpsertStreamTableSink requires that Table has a full primary keys if it is updated
Key: FLINK-22498
URL: https://issues.apache.org/jira/browse/FLINK-22498
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.12.1
Environment: flink 1.12.1
jdk 1.8
hive 2.1.1
kudu 1.10.0
kafka 2.0.0
Reporter: Carl
Attachments: bug.rar
*1. source table:*
CREATE TABLE ddl_source (
appl_seq DECIMAL(16,2),
name STRING,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ogg-json-03',
'properties.bootstrap.servers' = 'xxxx:9092',
'value.format' = 'canal-json'
)
*2. sink table:*create the table use impala
create table rt_dwd.test_bug(
pk string ,
name string ,
primary key (pk)
) partition by hash (pk) partitions 5 stored as kudu
TBLPROPERTIES ('kudu.master_addresses' = 'xxxx:7051');
*3. execute sql:*use blink planner
insert into kuducatalog.default_database.`rt_dwd.test_bug`
select CAST(appl_seq AS STRING), name from ddl_source
*throw an exception :*
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
*case A:if we use source table as follows, it will not throw the exception :*
CREATE TABLE ddl_source (
appl_seq STRING,
name STRING,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ogg-json-03',
'properties.bootstrap.servers' = 'xxxx:9092',
'value.format' = 'canal-json'
)
*case B:or we ddl kudu table,and use sql as follows, it will not throw the exception :*
_DDL:_
create table rt_dwd.test_bug(
pk decimal(16,2),
name string ,
primary key (pk)
) partition by hash (pk) partitions 5 stored as kudu
TBLPROPERTIES ('kudu.master_addresses' = 'xxxx:7051');
_DML:_
insert into kuducatalog.default_database.`rt_dwd.test_bug`
select appl_seq, name from ddl_source
*When debugging the source code, it may be related to SQL parsing engine*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)