You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘亚坤 <ws...@163.com> on 2020/06/01 02:49:22 UTC

pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def drop_fields(message, *fields):
      import json
      message = json.loads(message)
      for field in fields:
            message.pop(field)
      return  json.dumps(message)




st_env \
      .form_path("source") \
      .select("drop_fields(message,'x')") \
      .insert_into("sink")


message 格式:
{“a”:"1","x","2"}


报错参数类型不匹配:
Actual:(java.lang.String, java.lang.String)
Expected:(org.apache.flink.table.dataformat.BinaryString)


新手入门,请多指教,感谢。

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Posted by jack <ws...@163.com>.


您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数














在 2020-06-01 14:47:21,"Dian Fu" <di...@gmail.com> 写道:
>你传的第二个参数是string,这样试一下?
>select("drop_fields(message, array('x'))")
>
>不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)
>
>> 在 2020年6月1日,下午1:59,jack <ws...@163.com> 写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 是的,对应参数没有填写正确,感谢;
>> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-01 11:01:34,"Dian Fu" <di...@gmail.com> 写道:
>>> The input types should be as following:
>>> 
>>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>>> 
>>> Regards,
>>> Dian
>>> 
>>>> 在 2020年6月1日,上午10:49,刘亚坤 <ws...@163.com> 写道:
>>>> 
>>>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>>>> 
>>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>>> def drop_fields(message, *fields):
>>>>      import json
>>>>      message = json.loads(message)
>>>>      for field in fields:
>>>>            message.pop(field)
>>>>      return  json.dumps(message)
>>>> 
>>>> 
>>>> st_env \
>>>>      .form_path("source") \
>>>>      .select("drop_fields(message,'x')") \
>>>>      .insert_into("sink")
>>>> 
>>>> message 格式:
>>>> {“a”:"1","x","2"}
>>>> 
>>>> 报错参数类型不匹配:
>>>> Actual:(java.lang.String, java.lang.String)
>>>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>>>> 
>>>> 新手入门,请多指教,感谢。
>>> 

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Posted by Dian Fu <di...@gmail.com>.
你传的第二个参数是string,这样试一下?
select("drop_fields(message, array('x'))")

不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)

> 在 2020年6月1日,下午1:59,jack <ws...@163.com> 写道:
> 
> 
> 
> 
> 
> 
> 
> 是的,对应参数没有填写正确,感谢;
> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-01 11:01:34,"Dian Fu" <di...@gmail.com> 写道:
>> The input types should be as following:
>> 
>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年6月1日,上午10:49,刘亚坤 <ws...@163.com> 写道:
>>> 
>>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>>> 
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def drop_fields(message, *fields):
>>>      import json
>>>      message = json.loads(message)
>>>      for field in fields:
>>>            message.pop(field)
>>>      return  json.dumps(message)
>>> 
>>> 
>>> st_env \
>>>      .form_path("source") \
>>>      .select("drop_fields(message,'x')") \
>>>      .insert_into("sink")
>>> 
>>> message 格式:
>>> {“a”:"1","x","2"}
>>> 
>>> 报错参数类型不匹配:
>>> Actual:(java.lang.String, java.lang.String)
>>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>>> 
>>> 新手入门,请多指教,感谢。
>> 


Re:Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Posted by jack <ws...@163.com>.





是的,对应参数没有填写正确,感谢;
另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。











在 2020-06-01 11:01:34,"Dian Fu" <di...@gmail.com> 写道:
>The input types should be as following:
>
>input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>
>Regards,
>Dian
>
>> 在 2020年6月1日,上午10:49,刘亚坤 <ws...@163.com> 写道:
>> 
>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>> 
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def drop_fields(message, *fields):
>>       import json
>>       message = json.loads(message)
>>       for field in fields:
>>             message.pop(field)
>>       return  json.dumps(message)
>> 
>> 
>> st_env \
>>       .form_path("source") \
>>       .select("drop_fields(message,'x')") \
>>       .insert_into("sink")
>> 
>> message 格式:
>> {“a”:"1","x","2"}
>> 
>> 报错参数类型不匹配:
>> Actual:(java.lang.String, java.lang.String)
>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>> 
>> 新手入门,请多指教,感谢。
>

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Posted by Dian Fu <di...@gmail.com>.
The input types should be as following:

input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]

Regards,
Dian

> 在 2020年6月1日,上午10:49,刘亚坤 <ws...@163.com> 写道:
> 
> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
> 
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def drop_fields(message, *fields):
>       import json
>       message = json.loads(message)
>       for field in fields:
>             message.pop(field)
>       return  json.dumps(message)
> 
> 
> st_env \
>       .form_path("source") \
>       .select("drop_fields(message,'x')") \
>       .insert_into("sink")
> 
> message 格式:
> {“a”:"1","x","2"}
> 
> 报错参数类型不匹配:
> Actual:(java.lang.String, java.lang.String)
> Expected:(org.apache.flink.table.dataformat.BinaryString)
> 
> 新手入门,请多指教,感谢。


Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Posted by Dian Fu <di...@gmail.com>.
The input types should be as following:

input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]

Regards,
Dian

> 在 2020年6月1日,上午10:49,刘亚坤 <ws...@163.com> 写道:
> 
> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
> 
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def drop_fields(message, *fields):
>       import json
>       message = json.loads(message)
>       for field in fields:
>             message.pop(field)
>       return  json.dumps(message)
> 
> 
> st_env \
>       .form_path("source") \
>       .select("drop_fields(message,'x')") \
>       .insert_into("sink")
> 
> message 格式:
> {“a”:"1","x","2"}
> 
> 报错参数类型不匹配:
> Actual:(java.lang.String, java.lang.String)
> Expected:(org.apache.flink.table.dataformat.BinaryString)
> 
> 新手入门,请多指教,感谢。