You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wdmcode <wd...@aliyun.com.INVALID> on 2020/09/09 08:56:44 UTC

回复: flink-cdc sink mysql 问题

Hi 杨帅统
MySQL-MySQL数据实时同步也可以使用阿里开源的otter
https://github.com/alibaba/otter/


发件人: 杨帅统
发送时间: Wednesday, September 9, 2020 4:49 PM
收件人: user-zh@flink.apache.org
主题: flink-cdc sink mysql 问题

公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
在做测试的时候定义一张cdc源表和一张sink表
CREATE TABLE pvuv_test (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test'
);
CREATE TABLE pvuv_test_back (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test_back'
);
但是在通过SQL Client执行下面语句的时候,报错
INSERT INTO pvuv_test_back
SELECT * FROM pvuv_test;
---------------------------------
报错信息如下
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
Available factory identifiers are:
blackhole
elasticsearch-6
kafka
print


Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里


最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢