You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Chenzhiyuan(HR)" <zh...@huawei.com> on 2021/07/09 09:39:18 UTC

自定义函数参数不能正确获取参数

我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval.
我用的flink版本是1.10.0.


l  json 的ddl如下:
private static final String personKafkaTable = "CREATE TABLE hw_person_normal_t(\n"
    + " data ARRAY<ROW<byteSize STRING,columnName STRING,rawData STRING,type STRING>>,\n"
    + " key STRING,\n"
    + " operation STRING\n"
    + ") with (\n"
    + "'connector.type' = 'kafka', \n"
    + "'connector.version' = 'universal',\n"
    + "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n"
    + "'connector.properties.zookeeper.connect' = 'xxx',\n"
    + "'connector.properties.bootstrap.servers' = 'xxx',\n"
    + " 'connector.properties.group.id' = 'salaryGroup',\n"
    + " 'format.type' = 'json'\n"
    + ")";


l  sql查询中调用了自定义函数如下:

Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t")
    .joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType)")
    .select("personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType");


l  调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。


自定义function函数如下:

public class ParserJsonPersonNormalFunc extends TableFunction<Row> {

    private static final Logger log = LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class);

    public void eval(Row[] value) {
        try {
          log.info("eval start");
          collector.collect(Row.of(value));
        } catch (Exception e) {
            log.error("parser json failed :", e);
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
    }
}







代码里注册了function:

tEnv.sqlUpdate(personKafkaTable);
tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc());





消息体格式如下:

{

"beforeData": [],

"byteSize": 272,

"columnNumber": 32,

"data": [{

"byteSize": 8,

"columnName": "APPLY_PERSON_ID",

"rawData": 10017,

"type": "LONG"

}, {

"byteSize": 12,

"columnName": "UPDATE_SALARY",

"rawData": "11000.000000",

"type": "DOUBLE"

}, {

"byteSize": 11,

"columnName": "UP_AMOUNT",

"rawData": "1000.000000",

"type": "DOUBLE"

}, {

"byteSize": 3,

"columnName": "CURRENCY",

"rawData": "CNY",

"type": "STRING"

}, {

"byteSize": 32,

"columnName": "EXCHANGE_RATE",

"rawData": "1.000000000000000000000000000000",

"type": "DOUBLE"

},  {

"byteSize": 11,

"columnName": "DEDUCTED_ACCOUNT",

"rawData": "1000.000000",

"type": "DOUBLE"

}, {

"byteSize": 1,

"columnName": "ENTER_AT_PROCESS",

"rawData": "Y",

"type": "STRING"

}],

"dataCount": 0,

"dataMetaData": {

"connector": "mysql",

"pos": 1000368076,

"row": 0,

"ts_ms": 1625565737000,

"snapshot": "false",

"db": "testdb",

"table": "flow_person_t"

},

"key": "APPLY_PERSON_ID",

"memorySize": 1120,

"operation": "insert",

"rowIndex": -1,

"timestamp": "1970-01-01 00:00:00"

}