You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Chenzhiyuan(HR)" <zh...@huawei.com> on 2021/07/09 09:39:18 UTC
自定义函数参数不能正确获取参数
我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval.
我用的flink版本是1.10.0.
l json 的ddl如下:
private static final String personKafkaTable = "CREATE TABLE hw_person_normal_t(\n"
+ " data ARRAY<ROW<byteSize STRING,columnName STRING,rawData STRING,type STRING>>,\n"
+ " key STRING,\n"
+ " operation STRING\n"
+ ") with (\n"
+ "'connector.type' = 'kafka', \n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n"
+ "'connector.properties.zookeeper.connect' = 'xxx',\n"
+ "'connector.properties.bootstrap.servers' = 'xxx',\n"
+ " 'connector.properties.group.id' = 'salaryGroup',\n"
+ " 'format.type' = 'json'\n"
+ ")";
l sql查询中调用了自定义函数如下:
Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t")
.joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType)")
.select("personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType");
l 调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。
自定义function函数如下:
public class ParserJsonPersonNormalFunc extends TableFunction<Row> {
private static final Logger log = LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class);
public void eval(Row[] value) {
try {
log.info("eval start");
collector.collect(Row.of(value));
} catch (Exception e) {
log.error("parser json failed :", e);
}
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
}
}
代码里注册了function:
tEnv.sqlUpdate(personKafkaTable);
tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc());
消息体格式如下:
{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.000000",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.000000000000000000000000000000",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}