You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by mack143 <ma...@163.com> on 2022/03/01 00:45:31 UTC

Re:答复: 回复:如何从复杂的kafka消息体定义 table

退订
在 2021-07-09 10:06:19,"Chenzhiyuan(HR)" <zh...@huawei.com> 写道:
>消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:
>
>CREATE TABLE MyUserTable(
>APPLY_PERSON_ID VARCHAR,
>UPDATE_SALARY DECIMAL,
>UP_AMOUNT DECIMAL,
>CURRENCY VARCHAR,
>EXCHANGE_RATE DECIMAL
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>
>接下来直接查询每个字段的值:
>Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");
>
>请教下这个该如何定义DDL.
>
>
>
>发件人: 17610775726 [mailto:17610775726@163.com]
>发送时间: 2021年7月9日 9:26
>收件人: Chenzhiyuan(HR) <zh...@huawei.com>
>主题: 回复:如何从复杂的kafka消息体定义 table
>
>hi
>
>用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>Best
>JasonLee
>---- 回复的原邮件 ----
>发件人
>
>Chenzhiyuan(HR)<zh...@huawei.com>
>
>发送日期
>
>2021年07月09日 08:59
>
>收件人
>
>user-zh@flink.apache.org<us...@flink.apache.org>
>
>主题
>
>如何从复杂的kafka消息体定义 table
>
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>  定义的表如下:
>  CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>  "beforeData": [],
>   "byteSize": 272,
>   "columnNumber": 32,
>   "data": [{
>       "byteSize": 8,
>       "columnName": "APPLY_PERSON_ID",
>       "rawData": 10017,
>       "type": "LONG"
>   }, {
>       "byteSize": 12,
>       "columnName": "UPDATE_SALARY",
>       "rawData": "11000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 11,
>       "columnName": "UP_AMOUNT",
>       "rawData": "1000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 3,
>       "columnName": "CURRENCY",
>       "rawData": "CNY",
>       "type": "STRING"
>   }, {
>       "byteSize": 32,
>       "columnName": "EXCHANGE_RATE",
>       "rawData": "1.000000000000000000000000000000",
>       "type": "DOUBLE"
>   },  {
>       "byteSize": 11,
>       "columnName": "DEDUCTED_ACCOUNT",
>       "rawData": "1000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 1,
>       "columnName": "ENTER_AT_PROCESS",
>       "rawData": "Y",
>       "type": "STRING"
>   }],
>   "dataCount": 0,
>   "dataMetaData": {
>       "connector": "mysql",
>       "pos": 1000368076,
>       "row": 0,
>       "ts_ms": 1625565737000,
>       "snapshot": "false",
>       "db": "testdb",
>       "table": "flow_person_t"
>   },
>   "key": "APPLY_PERSON_ID",
>   "memorySize": 1120,
>   "operation": "insert",
>   "rowIndex": -1,
>   "timestamp": "1970-01-01 00:00:00"
>}