You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/15 03:15:55 UTC

[GitHub] [incubator-doris] 19932537671 opened a new issue, #9573: [Feature] The binlog load problem of Doris binlog load or Flink CDC

19932537671 opened a new issue, #9573:
URL: https://github.com/apache/incubator-doris/issues/9573

   Description of binlog load or load to Doris problem of Flink CDC and Flink SQL:
   
   The date field of MySQL table is bigint or timestamp, which is inserted into Doris in real time according to the dynamic partition of day
   
   Problem Description:
   
   1. The function from bigint and timestamp to date or datetime is too long
   
   2. Binlog load dynamic partition field does not support dynamic partition insertion into Doris according to MySQL derived fields
   
   3. Flynk CDC reads MySQL and adds fields in the middle of flinksql in Doris. It also does not support the addition of date and datetime
   Chinese restatement:
   中文赘述:
   binlog load或者flink-cdc和flink-sql的load到doris问题需求描述:
   mysql表日期字段为bigint或者timestamp,实时按照DAY动态分区插入到doris中
   问题描述:
   1.bigint和timestamp转date或者datetime函数太长
   2.binlog load动态分区字段不支持根据mysql的衍生字段动态分区插入到doris
   3.flink-cdc读取mysql到doris的flinksql中间添加字段也不支持date和datetime的添加
   
   
   
   example:
   
   Doris real table:
   CREATE TABLE point.record
   (
   id bigint,
   user_id bigint COMMENT '用户ID',
   date date,
   device_id String COMMENT '设备ID',
   idfa String COMMENT 'IOS IDFA',
   os String COMMENT '设备系统',
   os_version String COMMENT '系统版本',
   version String COMMENT 'APP版本',
   system tinyint COMMENT '1IOS 2安卓',
   platform tinyint COMMENT '1APP2小程序3H5',
   event_id bigint COMMENT '事件ID',
   log_id bigint COMMENT '日志自增ID',
   base_uri String COMMENT '当前短路径',
   event_data String COMMENT '关联数据额外数据',
   created_at bigint
   )
   UNIQUE KEY(id,user_id,date)
   PARTITION BY RANGE(date) ()
   DISTRIBUTED BY HASH(user_id)
   PROPERTIES
   (
   "replication_allocation" = "tag.location.default: 1",
   "dynamic_partition.enable" = "true",
   "dynamic_partition.time_unit" = "DAY",
   "dynamic_partition.end" = "3",
   "dynamic_partition.prefix" = "p",
   "dynamic_partition.buckets" = "32"
   );
   
   Flynk MySQL mapping table:
   CREATE TABLE mysql_record (
   id bigint,
   user_id bigint COMMENT '用户ID',
   
   device_id STRING COMMENT '设备ID',
   idfa STRING COMMENT 'IOS IDFA',
   os STRING COMMENT '设备系统',
   os_version STRING COMMENT '系统版本',
   version STRING COMMENT 'APP版本',
   system tinyint COMMENT '1IOS 2安卓',
   platform tinyint COMMENT '1APP2小程序3H5',
   event_id bigint COMMENT '事件ID',
   log_id bigint COMMENT '日志自增ID',
   base_uri STRING COMMENT '当前短路径',
   event_data STRING COMMENT '关联数据额外数据',
   created_at bigint ,
   primary key(id) NOT ENFORCED
   ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'linux008',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'point',
   'table-name' = 'record'
   );
   
   Flink Doris mapping table:
   CREATE TABLE doris_record (
   id bigint,
   user_id bigint COMMENT '用户ID',
   date String,
   device_id String COMMENT '设备ID',
   idfa String COMMENT 'IOS IDFA',
   os String COMMENT '设备系统',
   os_version String COMMENT '系统版本',
   version String COMMENT 'APP版本',
   system tinyint COMMENT '1IOS 2安卓',
   platform tinyint COMMENT '1APP2小程序3H5',
   event_id bigint COMMENT '事件ID',
   log_id bigint COMMENT '日志自增ID',
   base_uri String COMMENT '当前短路径',
   event_data String COMMENT '关联数据额外数据',
   created_at bigint
   )
   WITH (
   'connector' = 'doris',
   'fenodes' = 'linux008:8030',
   'table.identifier' = 'point.record',
   'sink.batch.size' = '2',
   'sink.batch.interval'='1',
   'username' = 'root',
   'password' = '123456'
   );
   
   
   Insert command:
   insert into doris_record
   select
   `id`,
   `user_id`,
   CONCAT(CAST(YEAR(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING), '-', CAST(MONTH(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING), '-', CAST(DAYOFMONTH(CAST(from_unixtime(CAST(substring( CAST(`created_at` as String) ,1,10) as BIGINT) , '%Y-%m-%d') as datetime)) AS STRING)) as `date`,
   `device_id`,
   `idfa`,
   `os`,
   `os_version`,
   `version`,
   `system`,
   `platform`,
   `event_id`,
   `log_id`,
   `base_uri`,
   `event_data`,
   `created_at`
   from mysql_record;
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] JNSimba commented on issue #9573: [Feature] The binlog load problem of Doris binlog load or Flink CDC

Posted by GitBox <gi...@apache.org>.
JNSimba commented on issue #9573:
URL: https://github.com/apache/incubator-doris/issues/9573#issuecomment-1127141216

   > 3.flink-cdc读取mysql到doris的flinksql中间添加字段也不支持date和datetime的添加
   
   flink sql maybe error,like this it worked
   
   
   ```sql
   
   -- mysql table
   CREATE TABLE test3(id int primary key,created_at bigint)
   
   
   -- doris table
   CREATE TABLE `test_sink` (
     id int  ,
     dt date,
     created_at bigint 
   )
   unique key(id)
   distributed by hash(id) buckets 1
   properties(
   "replication_allocation" = "tag.location.default: 1"
   )
   
   -- flink mysql schema
   CREATE TABLE mysql_record (
   id bigint,
   created_at bigint ,
   primary key(id) NOT ENFORCED
   ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = '127.0.0.1',
   'port' = '3306',
   'username' = 'root',
   'password' = 'Root123@',
   'database-name' = 'test',
   'table-name' = 'test3'
   )
   
   
   -- flink doris schema
   CREATE TABLE doris_record (
   id bigint,
   dt String,
   created_at bigint
   )
   WITH (
   'connector' = 'doris',
   'fenodes' = '127.0.0.1:8030',
   'table.identifier' = 'test.test_sink',
   'sink.batch.size' = '1',
   'username' = 'root',
   'password' = ''
   );
   
   
   -- flink sql
   insert into doris_record
   select id, FROM_UNIXTIME(created_at,'yyyy-MM-dd') as dt,created_at from mysql_record;
   
   
   ```
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org