You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jimmy Zhang <zh...@163.com> on 2021/07/22 16:40:51 UTC

Flink kafka自定义metrics在influxdb上解析失败

大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复:Flink kafka自定义metrics在influxdb上解析失败

Posted by Jimmy Zhang <13...@163.com>.
你好,我没有自己开发连接器,我用的是kafka connector,influxdb只是作为一个metrics信息存储端,你是需要一个influxdb的连接器?我只是用到了metrics统计体系而已,只是自定义了Counter对象,和连接器没有什么关系



发自 网易邮箱大师




---- 回复的原邮件 ----
| 发件人 | 信华哺<la...@163.com> |
| 日期 | 2021年11月26日 17:22 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 抄送至 | |
| 主题 | 回复:Flink kafka自定义metrics在influxdb上解析失败 |
你好:
   我想问一下,你用的flink sql连接器是自己开发的么? 我在网上只能找到一个datastream的influxdb连接器




在2021年7月23日 10:11,Jimmy Zhang<zh...@163.com> 写道:
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复:Flink kafka自定义metrics在influxdb上解析失败

Posted by 信华哺 <la...@163.com>.
你好:
    我想问一下,你用的flink sql连接器是自己开发的么? 我在网上只能找到一个datastream的influxdb连接器




在2021年7月23日 10:11,Jimmy Zhang<zh...@163.com> 写道:
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复:Flink kafka自定义metrics在influxdb上解析失败

Posted by Jimmy Zhang <zh...@163.com>.
Hi,caizhi,非常感谢你的回复!
在KafkaDynamicTableFactory.java的createDynamicTableSink(Context context)方法开始,我通过context.getObjectIdentifier().getObjectName()获取到sinkTableName。因为ObjectIdentifier类就唯一标识了一个表,它包括catalogName、databaseName和objectName,分别代表catalog名、数据库名和表名。之后,我通过构造传入到了FlinkKafkaProducer,然后就可以使用了。
我已经解决了该问题,根本原因是influxDB解析sql失败!原因是,我书写的flink sql语句 insert into 或者insert overwrite 中单引号带有换行符,所以写入influxdb会报错。另外,创建表的with参数也要维持版本统一!




|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2021年07月23日 10:32,Caizhi Weng 写道:
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang <zh...@163.com> 于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master

Re: Flink kafka自定义metrics在influxdb上解析失败

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang <zh...@163.com> 于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master