You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "maker_dong@foxmail.com" <ma...@foxmail.com> on 2021/06/17 13:29:12 UTC

flinksql消费kafka写入doris中文乱码

我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
    ...
     CHARACTER SET `UTF-8`
) WITH (
  'connector' = 'kafka',
  'topic' = 'datacollect_business_stage',
  'properties.bootstrap.servers' = 'XXX',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
  ...
   CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4 
部署模式:on yarn

希望各位大佬帮助。
谢谢! 


maker_dong@foxmail.com

回复:flinksql消费kafka写入doris中文乱码

Posted by JasonLee <17...@163.com>.
hi


你可以先用 print 的 connector 把消费到的数据打印一下看是否乱码? 还是写入到 doris 后出现的乱码?


Best
JasonLee
在2021年6月17日 21:31,maker_dong@foxmail.com<ma...@foxmail.com> 写道:
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'kafka',
'topic' = 'datacollect_business_stage',
'properties.bootstrap.servers' = 'XXX',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4
部署模式:on yarn

希望各位大佬帮助。
谢谢!


maker_dong@foxmail.com