You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 史 正超 <sh...@outlook.com> on 2020/10/16 03:45:42 UTC

回复: flink 自定义udf注册后不能使用

Hi, 从日志上看 是说 匹配不到 imei_encrypt的UDF,有可能是sql里传的字段和imei_encrypt的参数不匹配,
能看下你的具体代码和udf的声明吗
________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月16日 3:30
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: flink 自定义udf注册后不能使用

hello
我在使用flinkSQL注册udf时,发生了以下错误,这是我定义有问题还是flink的bug
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 11, column 6 to line 11,
column 23: No match found for function signature imei_encrypt(<CHARACTER>)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
        at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 11, column 6 to line 11, column 23: No match found for
function signature imei_encrypt(<CHARACTER>)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
        at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        ... 11 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 11,
column 6 to line 11, column 23: No match found for function signature
imei_encrypt(<CHARACTER>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
        at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
        at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
        ... 26 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
found for function signature imei_encrypt(<CHARACTER>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
        ... 54 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:回复: flink 自定义udf注册后不能使用

Posted by 奔跑的小飞袁 <s_...@126.com>.
是的,我这个函数只需要一个参数



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink 自定义udf注册后不能使用

Posted by Benchao Li <li...@apache.org>.
Hi,
当前可以理解Flink注册UDF有三种类型:
- TEMPORARY SYSTEM FUNCTION
- TEMPORARY CATALOG FUNCTION
- CATALOG FUNCTION

加上内置的SYSTEM FUNCTION
可以认为一共有四种,他们的解析顺序为:
1. TEMPORARY SYSTEM FUNCTION
2. SYSTEM FUNCTION
3. TEMPORARY CATALOG FUNCTION
4. CATALOG FUNCTION

所以你观察到TEMPORARY SYSTEM FUNCTION会覆盖内置函数,但是TEMPORARY CATALOG FUNCTION不会覆盖
这个现象是没有问题的。


amenhub@163.com <am...@163.com> 于2020年10月16日周五 下午3:46写道:

> 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug
>
> best,
> amenhub
>
> 发件人: 史 正超
> 发送时间: 2020-10-16 15:26
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
> 你这样创建试一下,或者换个名字试试
>
> CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS
> 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
>
> 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY
> SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
>
> ________________________________
> 发件人: 奔跑的小飞袁 <s_...@126.com>
> 发送时间: 2020年10月16日 6:47
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: 回复:回复: flink 自定义udf注册后不能使用
>
> 是的,是我传参有问题
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li

回复:回复: 回复: flink 自定义udf注册后不能使用

Posted by 罗显宴 <15...@163.com>.
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482803@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amenhub@163.com<am...@163.com> 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以

________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:回复: 回复: flink 自定义udf注册后不能使用

Posted by 罗显宴 <15...@163.com>.
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482803@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amenhub@163.com<am...@163.com> 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以

________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复: flink 自定义udf注册后不能使用

Posted by "amenhub@163.com" <am...@163.com>.
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub
 
发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试
 
CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
 
我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
 
________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复:回复: flink 自定义udf注册后不能使用
 
是的,是我传参有问题
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复:回复: flink 自定义udf注册后不能使用

Posted by 史 正超 <sh...@outlook.com>.
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以

________________________________
发件人: 奔跑的小飞袁 <s_...@126.com>
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:回复: flink 自定义udf注册后不能使用

Posted by 奔跑的小飞袁 <s_...@126.com>.
是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:回复: flink 自定义udf注册后不能使用

Posted by Shuai Xia <jk...@dingtalk.com.INVALID>.
你好,没看错的话,只有一个参?


------------------------------------------------------------------
发件人:奔跑的小飞袁 <s_...@126.com>
发送时间:2020年10月16日(星期五) 14:18
收件人:user-zh <us...@flink.apache.org>
主 题:Re: 回复: flink 自定义udf注册后不能使用

完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
    operation VARCHAR,
    operation_channel VARCHAR,
    `time` VARCHAR,
    ip VARCHAR,
    lat VARCHAR,
    lng VARCHAR,
    user_id VARCHAR,
    device_id VARCHAR,
    imei VARCHAR,
    targets ARRAY<ROW&lt;`type` VARCHAR,`value` VARCHAR>>,
    product_name VARCHAR,
    product_version VARCHAR,
    product_vendor VARCHAR,
    platform VARCHAR,
    platform_version VARCHAR,
    `languaage` VARCHAR,
    locale VARCHAR,
    other_para MAP<VARCHAR,VARCHAR>
) with (
    'connector'='kafka',
    'topic'='cloud_behavior',
    'properties.bootstrap.servers'='',
    'properties.group.id'='testGroup',
    'format'='avro',
    'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
    operation VARCHAR,
    operation_channel VARCHAR,
    `time` VARCHAR,
    ip VARCHAR,
    lat VARCHAR,
    lng VARCHAR,
    user_id VARCHAR,
    device_id VARCHAR,
    imei VARCHAR,
    product_name VARCHAR,
    product_version VARCHAR,
    product_vendor VARCHAR,
    platform VARCHAR,
    platform_version VARCHAR,
    `languaage` VARCHAR,
    locale VARCHAR
)with (
    'connector'='filesystem',
    'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
    'format'='parquet',
    'sink.rolling-policy.file-size'='128MB',
    'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
     operation,
     operation_channel,
     `time`,
     ip,
     lat,
     lng,
     user_id,
     device_id,
     imei_encrypt(imei) AS imei,
     product_name,
     product_version,
     product_vendor,
     platform,
     platform_version,
     `languaage`,
     locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink 自定义udf注册后不能使用

Posted by 奔跑的小飞袁 <s_...@126.com>.
完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
    operation VARCHAR,
    operation_channel VARCHAR,
    `time` VARCHAR,
    ip VARCHAR,
    lat VARCHAR,
    lng VARCHAR,
    user_id VARCHAR,
    device_id VARCHAR,
    imei VARCHAR,
    targets ARRAY<ROW&lt;`type` VARCHAR,`value` VARCHAR>>,
    product_name VARCHAR,
    product_version VARCHAR,
    product_vendor VARCHAR,
    platform VARCHAR,
    platform_version VARCHAR,
    `languaage` VARCHAR,
    locale VARCHAR,
    other_para MAP<VARCHAR,VARCHAR>
) with (
    'connector'='kafka',
    'topic'='cloud_behavior',
    'properties.bootstrap.servers'='',
    'properties.group.id'='testGroup',
    'format'='avro',
    'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
    operation VARCHAR,
    operation_channel VARCHAR,
    `time` VARCHAR,
    ip VARCHAR,
    lat VARCHAR,
    lng VARCHAR,
    user_id VARCHAR,
    device_id VARCHAR,
    imei VARCHAR,
    product_name VARCHAR,
    product_version VARCHAR,
    product_vendor VARCHAR,
    platform VARCHAR,
    platform_version VARCHAR,
    `languaage` VARCHAR,
    locale VARCHAR
)with (
    'connector'='filesystem',
    'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
    'format'='parquet',
    'sink.rolling-policy.file-size'='128MB',
    'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
     operation,
     operation_channel,
     `time`,
     ip,
     lat,
     lng,
     user_id,
     device_id,
     imei_encrypt(imei) AS imei,
     product_name,
     product_version,
     product_vendor,
     platform,
     platform_version,
     `languaage`,
     locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink 自定义udf注册后不能使用

Posted by 奔跑的小飞袁 <s_...@126.com>.
这是我的udf声明
CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;
以下是udf实现
public class IMEIEncrypt extends ScalarFunction {

    public String eval(String column_type,String value) {
        EncryptUtils encryptUtils = new EncryptUtils();
        return encryptUtils.encrypt(column_type,value);
    }
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/