You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/07/03 13:16:00 UTC

[jira] [Closed] (FLINK-18461) Changelog source can't be insert into upsert sink

     [ https://issues.apache.org/jira/browse/FLINK-18461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu closed FLINK-18461.
---------------------------
    Resolution: Fixed

- master (1.12.0): e08994588709b30e5d59d3c8302893a53c8a6baf
- 1.11.1: 334f35cbd6da754d8b5b294032cd84c858b1f973

> Changelog source can't be insert into upsert sink
> -------------------------------------------------
>
>                 Key: FLINK-18461
>                 URL: https://issues.apache.org/jira/browse/FLINK-18461
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.1
>
>
> {code:sql}
> CREATE TABLE t_pick_order (
>       order_no VARCHAR,
>       status INT
> ) WITH (
>       'connector' = 'kafka',
>       'topic' = 'example',
>       'scan.startup.mode' = 'latest-offset',
>       'properties.bootstrap.servers' = '172.19.78.32:9092',
>       'format' = 'canal-json'
> );
> CREATE TABLE order_status (
>           order_no VARCHAR,
>           status INT,
> 		  PRIMARY KEY (order_no) NOT ENFORCED
> ) WITH (
>           'connector' = 'jdbc',
>           'url' = 'jdbc:mysql://xxx:3306/flink_test',
>           'table-name' = 'order_status',
>           'username' = 'dev',
>           'password' = 'xxxx'
> );
> INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
> {code}
> The above queries throw the following exception:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
> Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
> {code}
> It is a bug in planner that we didn't fallback to {{BEFORE_AND_AFTER}} trait when {{ONLY_UPDATE_AFTER}} can't be satisfied. This results in Changelog source can't be used to written into upsert sink. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)