You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/07/02 03:37:00 UTC

[jira] [Created] (FLINK-18461) Changelog source can't be insert into upsert sink

Jark Wu created FLINK-18461:
-------------------------------

             Summary: Changelog source can't be insert into upsert sink
                 Key: FLINK-18461
                 URL: https://issues.apache.org/jira/browse/FLINK-18461
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Jark Wu
            Assignee: Jark Wu
             Fix For: 1.11.0



{code:sql}
CREATE TABLE t_pick_order (
      order_no VARCHAR,
      status INT
) WITH (
      'connector' = 'kafka',
      'topic' = 'example',
      'scan.startup.mode' = 'latest-offset',
      'properties.bootstrap.servers' = '172.19.78.32:9092',
      'format' = 'canal-json'
);

CREATE TABLE order_status (
          order_no VARCHAR,
          status INT,
		  PRIMARY KEY (order_no) NOT ENFORCED
) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://xxx:3306/flink_test',
          'table-name' = 'order_status',
          'username' = 'dev',
          'password' = 'xxxx'
);

INSERT INTO order_status SELECT order_no, status FROM t_pick_order ;
{code}

The above queries throw the following exception:

{code
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
{code}

It is a bug in planner that we didn't fallback to {{BEFORE_AND_AFTER}} trait when {{ONLY_UPDATE_AFTER}} can't be satisfied. This results in Changelog source can't be used to written into upsert sink. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)